分享

基于HBase的冠字号查询系统2--实现部分

问题导读:
1、冠字号查询系统功能包括哪些内容?
2、取款流程和存款流程代码如何实现?




1. 软件版本和部署
maven:3.3.9
jdk:1.7
Struts2:2.3.24.1
hibernate:4.3.6
spring:4.2.5
MySQL:5.1.34
Junit:4
Myeclipse:2014

Hadoop2.6.4
HBase1.1.2

源码下载:https://github.com/fansy1990/ssh_v3/releases

部署参考:基于SSH的HDFS文件web管理系统
http://www.aboutyun.com/forum.php?mod=viewthread&tid=18726


数据下载:   http://pan.baidu.com/s/1slGJ9jJ

请先参考上篇Blog:基于HBase的冠字号查询系统1--理论部分

2. 系统功能&核心实现
2.1 系统首页
index.png
冠字号查询系统,主要包括两方面功能:
1. 把原始数据通过MR流程从HDFS导入到HBase,提供通用接口;
2. 提供冠字号查询功能、提供存款、取款功能;

2.2 数据加载功能
index.png
数据加载功能是一个通用的HBase表数据导入功能;
用户只需提供原始数据路径(HDFS)、HBase表名(该表需要先存在)、列描述(参考前篇博客此参数解释)、字段分隔符、时间格式即可;

2.2.1 后台实现
[mw_shl_code=applescript,true]public void submitJob() {
                Map<String, Object> jsonMap = new HashMap<String, Object>();
                if (HadoopUtils.getMrLock().equals(MRLock.NOTLOCKED)) {// 没有锁,则可以提交代码
                        // 先加锁
                        HadoopUtils.setMrLock(MRLock.LOCKED);
                        // 清空MR任务缓存
                        HadoopUtils.initMRCache();

                        // 提交任务
                        new Thread(new Hdfs2HBaseRunnable(hdfsFile, tableName,
                                        colDescription, splitter, dateFormat)).start();

                        jsonMap.put("flag", "true");
                        jsonMap.put("jobId", HadoopUtils.getJobId());
                } else {
                        jsonMap.put("flag", "false");
                        jsonMap.put("msg", "已经在运行MR程序,请确认!");
                }
                Utils.write2PrintWriter(JSON.toJSONString(jsonMap));
                return;
        }[/mw_shl_code]

这里提供一个MRLock,加此锁是防止在提交任务后,任务正在运行,而有其他程序重复提交任务(监控会有问题);
同时,这里使用多线程,提交任务后,立即返回前台,前台接收返回的信息后,根据判断,是否弹窗监控任务进度:
[mw_shl_code=applescript,true]ret = callByAJax("hadoop/hadoop_submitJob.action",
                        {hdfsFile:hdfs,tableName:table,colDescription:colDescription,splitter:splitter,dateFormat:dateFormat})
        if(ret.flag=="false"){// 提交任务失败
                $.messager.alert('提示',ret.msg,'warn');
                return ;
        }
         $.messager.progress({
             title:'提示',
             msg:'导入数据中...',
             interval:0    //disable auto update progress value
         });
        // hadoop_submitJob.action 返回的ret中包含jobId , ret.jobId
        if(typeof(EventSource)!=="undefined")
          {
                console.info("jobId:"+ret.jobId);
          var source=new EventSource("hadoop/hadoop_getMRProgress.action"+"?jobId="+ ret.jobId );
          source.onmessage=function(event)
            {
                  console.info(event.data);
                  
                  // TODO 判断event.data indexOf error ,解析:后面的值,显示,同时提示任务错误
                  if(event.data.indexOf( "error")> -1){
                          source.close();
                          $.messager.progress('close');
                          $.messager.alert('提示',"任务运行失败!",'warn');
                  }
                  // TODO 判断 event.data 为success ,则提示任务成功, 其他清空则任务进度即可
                  if(event.data == "success"){
                          source.close();
                          $.messager.progress('close');
                          $.messager.alert('提示',"任务运行成功!",'warn');
                  }
                  var bar = $.messager.progress('bar');
                  bar.progressbar('setValue',  event.data);
                  
            };[/mw_shl_code]
注意这里的JobId的获取:

1) 在提交任务的时候把job变量设置到外部静态类中;
index.png
2)在Thread线程提交任务后,去获取jobId
index.png
需要注意的是,jobId只有在任务运行中才会被初始化,所以在提交任务后(thread运行中才初始化jobid);
3)while循环获取jobid:
[mw_shl_code=applescript,true]public static String getJobId() {
                long start = System.currentTimeMillis();
                while (noJobId()) {
                        try {
                                Thread.sleep(200);
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                        log.info(" Getting job id ...");
                }
                long end = System.currentTimeMillis();

                log.info("获取jobId,耗时:" + (end - start) * 1.0 / 1000 + "s");
                return currJob.getJobID().toString();
        }[/mw_shl_code]
[mw_shl_code=applescript,true]private static boolean noJobId() {
                if (currJob == null || currJob.getJobID() == null)
                        return true;

                return false;
        }[/mw_shl_code]
同时,这里使用了HTML5的服务器发送事件,关于此技术请参考:http://www.w3school.com.cn/html5/html_5_serversentevents.asp ;

2.2.2 实例
提交任务后,实时反映任务运行进度:
index.png
后台日志:
index.png
运行成功:
index.png

2.3. 查询冠字号
index.png

这里查询冠字号包含两个部分:
1)随机生成:是指随机生成冠字号(根据个数可以生成不同个数冠字号,逗号分隔);
index.png
2) 查询,根据冠字号进行查询,如果hbase表中该记录的exist字段为1则说明存在,否则说明查询改冠字号为取出状态,则对应为疑似伪钞冠字号;
index.png

3) 详细查询:查询冠字号的所有信息,可以查询多个版本信息:
index.png
2.3.1后台实现
[mw_shl_code=applescript,true]/**
         * 检查给定的冠字号是否存在疑似伪钞冠字号
         *
         * @param stumbers
         * @return
         * @throws IllegalArgumentException
         * @throws IOException
         */
        public Map<String, String> checkStumbersExist(String stumbers)
                        throws IllegalArgumentException, IOException {
                String[] stumbersArr = StringUtils.split(stumbers, Utils.COMMA);
                Connection connection = HadoopUtils.getHBaseConnection();
                Table table = connection.getTable(TableName
                                .valueOf(Utils.IDENTIFY_RMB_RECORDS));
                Map<String, String> map = new HashMap<>();
                Get get = null;
                try {
                        List<Get> gets = new ArrayList<>();
                        for (String stumber : stumbersArr) {
                                get = new Get(stumber.trim().getBytes());
                                gets.add(get);
                        }
                        Result[] results = table.get(gets);
                        String exist;
                        StringBuffer existStr = new StringBuffer();
                        StringBuffer notExistStr = new StringBuffer();
                        for (int i = 0; i < results.length; i++) {
                                exist = new String(results.getValue(Utils.FAMILY,
                                                Utils.COL_EXIST));
                                if ("1".equals(exist)) {
                                        existStr.append(stumbersArr).append(Utils.COMMA);
                                } else if ("0".equals(exist)) {
                                        notExistStr.append(stumbersArr).append(Utils.COMMA);
                                } else {
                                        log.info("冠字号:" + stumbersArr + "值 exist字段值异常!");
                                }
                        }
                        if (existStr.length() > 0) {
                                map.put("exist", existStr.substring(0, existStr.length() - 1));
                        } else {
                                map.put("exist", "nodata");
                        }
                        if (notExistStr.length() > 0) {
                                map.put("notExist",
                                                notExistStr.substring(0, notExistStr.length() - 1));
                        } else {
                                map.put("notExist", "nodata");
                        }
                } catch (Exception e) {
                        e.printStackTrace();
                }
                return map;
        }[/mw_shl_code]
直接使用HBase的Table Java API实现即可;

获取给定rowkey以及版本数的记录,同样使用HBase 的Table Java API 即可实现
[mw_shl_code=applescript,true]/**
* 根据rowkey和版本个数查询数据
* @param tableName
* @param cfs
* @param rowkeys
* @param versions
* @return
* @throws IOException
*/
        public List<HBaseTableData> getTableCertainRowKeyData(String tableName,
                        String cfs, String rowkeys, int versions) throws IOException {
                String[] stumbersArr = StringUtils.split(rowkeys, Utils.COMMA);
                Connection connection = HadoopUtils.getHBaseConnection();
                Table table = connection.getTable(TableName
                                .valueOf(tableName));
                List<HBaseTableData> list = new ArrayList<>();
                Get get = null;
                try {
                        List<Get> gets = new ArrayList<>();
                        for (String stumber : stumbersArr) {
                                get = new Get(stumber.trim().getBytes());
                                get.setMaxVersions(versions);
                                gets.add(get);
                        }
                        Result[] results = table.get(gets);
                        Cell[] cells;
                        for (int i = 0; i < results.length; i++) {
                                cells = results.rawCells();

                                list.addAll(getHBaseTableDataListFromCells(cells));

                        }

                        return list;

                } catch (Exception e) {
                        e.printStackTrace();
                }

                return null;
        }[/mw_shl_code]

2.4 存款:
index.png
1)存款需要输入用户ID、银行、冠字号,当然也可以随机生成;
index.png
2)存取使用的是Table的checkAndPut 函数,关于此函数存储数据的一致性,参考:http://blog.csdn.net/fansy1990/article/details/51451583
index.png
由于AAAR5912的冠字号,其exist状态为1,说明HBase表中此冠字号为存储状态,不能再次存储,即发现了疑似伪钞的冠字号;

2.5 取款
index.png
1)取款同样有随机生成功能,类似上面:
index.png
当然,这里随机生成的只是用户和银行而已;
2) 取款:取款根据取款金额进行获取:
index.png
取款流程如下:

1)根据给定的取款冠字号个数num,随机查找冠字号(rowkey)对应的op_www:exist字段值为1的num*3条记录;

2)使用HBase.checkAndPut进行更新,把op_www:exist字段值更新为0,并返回更新后的rowkey,即冠字号;

3)如果在num*3条记录更新后,被更新的冠字号不足num条,则再次随机查找冠字号对应的op_www:exist字段值为1的记录,并更新,返回更新后的冠字号,直到返回的冠字号个数为num;

2.6 验证每秒500+查询
使用单个线程进行查询:
[mw_shl_code=applescript,true]package stumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class ReadTest {
        
//        private static String FAMILY ="info";
        
        
        public static void main(String[] args) throws IOException {
                long size =10000;
                get(Utils.getConn(),Utils.generateRowKey(size));
        }
        
        public static void get(Connection connection,List<byte[]> rowkeys) throws IOException {
                System.out.println(new Date()+":开始读取记录...");
                long start =System.currentTimeMillis();
                Table table = connection.getTable(TableName.valueOf(Utils.TABLE));
                Get get = null ;
                long count =0;
                try{
                        for(byte[] rowkey :rowkeys){
                                count ++;
        //                        get = new Get(Bytes.toBytes(""));
                                get = new Get(rowkey);
                                table.get(get);
                                if(count%1000==0){
                                        System.out.println("count:"+count);
                                }
                        }
                long end = System.currentTimeMillis();
                System.out.println(new Date()+":"+rowkeys.size()+"条记录,读取耗时:"+(end-start)*1.0/1000+"s");
                }catch(Exception e){
                        
                }finally{
                        table.close();
                }
               
        }
        
        
}
[/mw_shl_code]
使用多线程查询:
[mw_shl_code=applescript,true]package stumer;

import java.util.Date;
import java.util.List;

import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Table;

public class ReadThread implements Runnable {

        private List<byte[]> rks;
        private Table table;
        public  ReadThread(Table table ,List<byte[]> rks) {
                this.table = table;
                this.rks = rks;
        }
        @Override
        public void run() {
                System.out.println(Thread.currentThread().getName()+" "+new Date()+":开始读取记录...");
                long start =System.currentTimeMillis();
                Get get = null ;
                long count =0;
                try{
                        for(byte[] rowkey :rks){
                                count ++;
        //                        get = new Get(Bytes.toBytes(""));
                                get = new Get(rowkey);
                                table.get(get);
                                if(count%1000==0){
                                        System.out.println(Thread.currentThread().getName()+" count:"+count);
                                }
                        }
                long end = System.currentTimeMillis();
                System.out.println(Thread.currentThread().getName()+" "+new Date()
                                +":"+rks.size()+"条记录,读取耗时:"+(end-start)*1.0/1000+"s");
                }catch(Exception e){
                        
                }
               
        }

}
[/mw_shl_code]
多线程查询主程序:
[mw_shl_code=applescript,true]package stumer;

import java.io.IOException;

public class ReadThreadTest {

        public static void main(String[] args) throws IOException {
                long dataSize =500;
                int threadSize = 20;
                for(int i=0;i<threadSize;i++){
                        new Thread(new ReadThread(Utils.getTable(), Utils.generateRowKey(dataSize))).start();
                }
        }
}
[/mw_shl_code]
工程类Utils程序
[mw_shl_code=applescript,true]package stumer;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class Utils {
        public static String TABLE = "records";
        private static DecimalFormat df = new DecimalFormat( "0000" );
        public static String[]  crownSizePrefixes =null;
        static Random random = new Random();

        static {
                crownSizePrefixes = new String[26*2];
                for (int i = 0; i < crownSizePrefixes.length/2; i++) {
                        crownSizePrefixes = "AAA" + (char) (65 + i);
                        crownSizePrefixes[i+26] = "AAB" + (char) (65 + i);               
                }
        }
        /**
         * 把0~9999 转为 0000~9999
         * @param num
         * @return
         */
        public static String formatCrownSizeSuffix(int num){
                return df.format(num);
        }
        public static Table getTable() throws IOException{
                return getConn().getTable(TableName.valueOf(TABLE));
        }
        public static String getRandomCrownSize(){
                return crownSizePrefixes[random.nextInt(crownSizePrefixes.length)]
                                +formatCrownSizeSuffix(random.nextInt(10000));
        }
        public static Connection getConn() throws IOException {
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.master", "node2:16000");// 指定HMaster
                conf.set("hbase.rootdir", "hdfs://node1:8020/hbase");// 指定HBase在HDFS上存储路径
                conf.set("hbase.zookeeper.quorum", "node2,node3,node4");// 指定使用的Zookeeper集群
                conf.set("hbase.zookeeper.property.clientPort", "2181");// 指定使用Zookeeper集群的端口

                Connection connection = ConnectionFactory.createConnection(conf);// 获取连
                return connection;
        }
        
        public static List<byte[]> generateRowKey(long size){
                System.out.println(new Date()+"开始生成"+size +"条记录...");
                long start =System.currentTimeMillis();
                List<byte[]>  rowkeys = new ArrayList<>();
               
                for(int i=0;i<size;i++){
                        rowkeys.add(Bytes.toBytes(Utils.getRandomCrownSize()));
                }
                long end =System.currentTimeMillis();
                System.out.println(new Date()+":"+rowkeys.size()+"条记录,生成耗时:"+(end-start)*1.0/1000+"s");
                return rowkeys;
        }
}
[/mw_shl_code]

3. 总结
1) 基于冠字号查询系统基于已存在HBase的冠字号非伪钞,如果已存在冠字号包含伪钞,则存储和取钱功能都会有问题;
2) 原始数据(用户信息、冠字号交易信息),在一定程序上是有规律的,并且对于大数据来说,还是小数据,需要在较大数据集上测试;
3)用户账号相关信息(存储的钱总数等)并没有在该系统中体现,后续可以考虑;
4) 查询冠字号、存、取款功能在第一次点击的时候初始化时间较长,考虑弹窗显示;
5)查询冠字号、存、取款功能中的详细查询可以在定制点,比如可以只查询出某个列簇或列的数据即可;

来源:http://blog.csdn.net/fansy1990


已有(4)人评论

跳转到指定楼层
hahaxixi 发表于 2016-6-7 21:25:32
nice~~~
回复

使用道具 举报

xuliang123789 发表于 2016-6-8 09:50:28
谢谢楼主,学习了,赞~~
回复

使用道具 举报

springclan 发表于 2016-7-25 23:31:01
简单直接,手把手
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条