分享

大数据应用之HBase数据插入性能优化之多线程并行插入测试案例

问题导读:
1、单线程下HBase的插入性能如何?
2、如何在多线程下了解HBase的性能?





一、引言:
  关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。根据网友的反馈,基于单线程的模式实现的数据插入毕竟有限。通过个人实测,在我的虚拟机环境下,单线程插入数据的值约为4w/s。集群指标是:CPU双核1.83,虚拟机512M内存,集群部署单点模式。本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示:


二、源程序:

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.hbase.HBaseConfiguration;
  3. import java.io.BufferedReader;
  4. import java.io.File;
  5. import java.io.FileNotFoundException;
  6. import java.io.FileReader;
  7. import java.io.IOException;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. import java.util.Random;
  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.hbase.HBaseConfiguration;
  13. import org.apache.hadoop.hbase.client.HBaseAdmin;
  14. import org.apache.hadoop.hbase.client.HTable;
  15. import org.apache.hadoop.hbase.client.HTableInterface;
  16. import org.apache.hadoop.hbase.client.HTablePool;
  17. import org.apache.hadoop.hbase.client.Put;
  18. public class HBaseImportEx {
  19.     static Configuration hbaseConfig = null;
  20.     public static HTablePool pool = null;
  21.     public static String tableName = "T_TEST_1";
  22.     static{
  23.          //conf = HBaseConfiguration.create();
  24.          Configuration HBASE_CONFIG = new Configuration();
  25.          HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000");
  26.          HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133");
  27.          HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
  28.          hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG);
  29.             
  30.          pool = new HTablePool(hbaseConfig, 1000);
  31.     }
  32.     /*
  33.      * Insert Test single thread
  34.      * */
  35.     public static void SingleThreadInsert()throws IOException
  36.     {
  37.         System.out.println("---------开始SingleThreadInsert测试----------");
  38.         long start = System.currentTimeMillis();
  39.         //HTableInterface table = null;
  40.         HTable table = null;
  41.         table = (HTable)pool.getTable(tableName);
  42.         table.setAutoFlush(false);
  43.         table.setWriteBufferSize(24*1024*1024);
  44.         //构造测试数据
  45.         List<Put> list = new ArrayList<Put>();
  46.         int count = 10000;
  47.         byte[] buffer = new byte[350];
  48.         Random rand = new Random();
  49.         for(int i=0;i<count;i++)
  50.         {
  51.             Put put = new Put(String.format("row %d",i).getBytes());
  52.             rand.nextBytes(buffer);
  53.             put.add("f1".getBytes(), null, buffer);
  54.             //wal=false
  55.             put.setWriteToWAL(false);
  56.             list.add(put);
  57.             if(i%10000 == 0)
  58.             {
  59.                 table.put(list);
  60.                 list.clear();
  61.                 table.flushCommits();
  62.             }         
  63.         }
  64.         long stop = System.currentTimeMillis();
  65.         //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
  66.             
  67.         System.out.println("插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
  68.            
  69.         System.out.println("---------结束SingleThreadInsert测试----------");
  70.     }
  71.     /*
  72.      * 多线程环境下线程插入函数
  73.      *
  74.      * */
  75.     public static void InsertProcess()throws IOException
  76.     {
  77.         long start = System.currentTimeMillis();
  78.         //HTableInterface table = null;
  79.         HTable table = null;
  80.         table = (HTable)pool.getTable(tableName);
  81.         table.setAutoFlush(false);
  82.         table.setWriteBufferSize(24*1024*1024);
  83.         //构造测试数据
  84.         List<Put> list = new ArrayList<Put>();
  85.         int count = 10000;
  86.         byte[] buffer = new byte[256];
  87.         Random rand = new Random();
  88.         for(int i=0;i<count;i++)
  89.         {
  90.             Put put = new Put(String.format("row %d",i).getBytes());
  91.             rand.nextBytes(buffer);
  92.             put.add("f1".getBytes(), null, buffer);
  93.             //wal=false
  94.             put.setWriteToWAL(false);
  95.             list.add(put);
  96.             if(i%10000 == 0)
  97.             {
  98.                 table.put(list);
  99.                 list.clear();
  100.                 table.flushCommits();
  101.             }         
  102.         }
  103.         long stop = System.currentTimeMillis();
  104.         //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
  105.             
  106.         System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
  107.     }
  108.       
  109.       
  110.     /*
  111.      * Mutil thread insert test
  112.      * */
  113.     public static void MultThreadInsert() throws InterruptedException
  114.     {
  115.         System.out.println("---------开始MultThreadInsert测试----------");
  116.         long start = System.currentTimeMillis();
  117.         int threadNumber = 10;
  118.         Thread[] threads=new Thread[threadNumber];
  119.         for(int i=0;i<threads.length;i++)
  120.         {
  121.             threads[i]= new ImportThread();
  122.             threads[i].start();      
  123.         }
  124.         for(int j=0;j< threads.length;j++)
  125.         {
  126.              (threads[j]).join();
  127.         }
  128.         long stop = System.currentTimeMillis();
  129.             
  130.         System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗时:"+ (stop - start)*1.0/1000+"s");      
  131.         System.out.println("---------结束MultThreadInsert测试----------");
  132.     }
  133.     /**
  134.      * @param args
  135.      */
  136.     public static void main(String[] args)  throws Exception{
  137.         // TODO Auto-generated method stub
  138.         //SingleThreadInsert();   
  139.         MultThreadInsert();
  140.            
  141.            
  142.     }
  143.       
  144.     public static class ImportThread extends Thread{
  145.         public void HandleThread()
  146.         {                     
  147.             //this.TableName = "T_TEST_1";
  148.            
  149.                
  150.         }
  151.         //
  152.         public void run(){
  153.             try{
  154.                 InsertProcess();         
  155.             }
  156.             catch(IOException e){
  157.                 e.printStackTrace();              
  158.             }finally{
  159.                 System.gc();
  160.                 }
  161.             }         
  162.         }
  163. }
复制代码


三、说明
1.线程数设置需要根据本集群硬件参数,实际测试得出。否则线程过多的情况下,总耗时反而是下降的。
2.单笔提交数对性能的影响非常明显,需要在自己的环境下,找到最理想的数值,这个需要与单条记录的字节数相关。


四、测试结果
---------开始MultThreadInsert测试----------
线程:8插入数据:10000共耗时:1.328s
线程:16插入数据:10000共耗时:1.562s
线程:11插入数据:10000共耗时:1.562s
线程:10插入数据:10000共耗时:1.812s
线程:13插入数据:10000共耗时:2.0s
线程:17插入数据:10000共耗时:2.14s
线程:14插入数据:10000共耗时:2.265s
线程:9插入数据:10000共耗时:2.468s
线程:15插入数据:10000共耗时:2.562s
线程:12插入数据:10000共耗时:2.671s
MultThreadInsert:100000共耗时:2.703s
---------结束MultThreadInsert测试----------




最后,感谢原作者的无私分享:51cto

本帖被以下淘专辑推荐:

已有(4)人评论

跳转到指定楼层
Joker 发表于 2015-5-13 14:04:28
循环的时候,需要从1开始,<=,否则在 %的部分是不能进入的。导致只有一条数据插入到HBase


还有个疑问是,当某一个线程达到插入10000条数据的时候,其它线程不终止,还在插入。需要控制。
对线程没有太多了解,需要学习。LZ有方法也可以共享下
回复

使用道具 举报

tang 发表于 2015-6-10 20:55:37
回复

使用道具 举报

zhangzh 发表于 2015-8-11 17:19:38
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条