分享

hive-udf-kafka批量数据导入kafka

背景:数据存在hive中,现在需要将数据导入kafka中,为了减少中间环节,使用自定义UDF将hive数据导入到kafka中

20190114105622370.png

问题:UDF时对一行的处理,批量导入就会涉及多行的问题,怎么将多行数据放到一个udf中?
解决思路:用collect_list函数将多行转成集合,在udf中循环遍历,发送到kafka

  1. package cn.kobold;
  2. import org.apache.hadoop.hive.ql.exec.Description;
  3. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  4. import org.apache.hadoop.hive.ql.metadata.HiveException;
  5. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  6. import org.apache.hadoop.hive.serde2.objectinspector.*;
  7. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
  8. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  9. import org.apache.hadoop.io.IntWritable;
  10. import org.apache.kafka.clients.producer.KafkaProducer;
  11. import org.apache.kafka.clients.producer.Producer;
  12. import org.apache.kafka.clients.producer.ProducerRecord;
  13. import org.json.JSONObject;
  14. import java.util.HashMap;
  15. import java.util.Map;
  16. import java.util.Properties;
  17. @Description(name = "hive2kafka", value = "_FUNC_(brokerhost_and_port,topic, array<map<string,string>>) - Return ret ")
  18. public class Hive2KakfaUDF extends GenericUDF {
  19.     private String hostAndPort;
  20.     private String topics;
  21.     private StandardListObjectInspector paramsListInspector;
  22.     private StandardMapObjectInspector paramsElementInspector;
  23.     public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {
  24.         if (arg0.length != 3) {
  25.             throw new UDFArgumentException(" Expecting   two  arguments:<brokerhost:port> <topic>  array<map<string,string>> ");
  26.         }
  27.         // 第一个参数验证
  28.         if (arg0[0].getCategory() == Category.PRIMITIVE
  29.                 && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
  30.             if (!(arg0[0] instanceof ConstantObjectInspector)) {
  31.                 throw new UDFArgumentException("broker host:port  must be constant");
  32.             }
  33.             ConstantObjectInspector brokerhost_and_port = (ConstantObjectInspector) arg0[0];
  34.             hostAndPort = brokerhost_and_port.getWritableConstantValue().toString();
  35.         }
  36.         // 第二个参数验证
  37.         if (arg0[1].getCategory() == Category.PRIMITIVE
  38.                 && ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
  39.             if (!(arg0[1] instanceof ConstantObjectInspector)) {
  40.                 throw new UDFArgumentException("kafka topic must be constant");
  41.             }
  42.             ConstantObjectInspector topicCOI = (ConstantObjectInspector) arg0[1];
  43.             topics = topicCOI.getWritableConstantValue().toString();
  44.         }
  45.         // 第三个参数验证
  46.         if (arg0[2].getCategory() != Category.LIST) {
  47.             throw new UDFArgumentException(" Expecting an array<map<string,string>> field as third argument ");
  48.         }
  49.         ListObjectInspector third = (ListObjectInspector) arg0[2];
  50.         if (third.getListElementObjectInspector().getCategory() != Category.MAP) {
  51.             throw new UDFArgumentException(" Expecting an array<map<string,string>> field as third argument ");
  52.         }
  53.         paramsListInspector = ObjectInspectorFactory.getStandardListObjectInspector(third.getListElementObjectInspector());
  54.         paramsElementInspector = (StandardMapObjectInspector) third.getListElementObjectInspector();
  55.         System.out.println(paramsElementInspector.getMapKeyObjectInspector().getCategory());
  56.         System.out.println(paramsElementInspector.getMapValueObjectInspector().getCategory());
  57.         return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
  58.     }
  59.     public Object evaluate(DeferredObject[] arg0) throws HiveException {
  60.         Properties props = new Properties();
  61.         props.put("bootstrap.servers", hostAndPort);
  62.         props.put("acks", "all");
  63.         props.put("retries", 0);
  64.         props.put("batch.size", 16384);
  65.         props.put("linger.ms", 1);
  66.         props.put("buffer.memory", 33554432);
  67.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  68.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  69.         // 创建kafka生产者
  70.         Producer<String, String> producer = new KafkaProducer<String, String>(props);
  71.         for (int i = 0; i < paramsListInspector.getListLength(arg0[2].get()); i++) {
  72.             Object row = paramsListInspector.getListElement(arg0[2].get(), i);
  73.             Map<?, ?> map = paramsElementInspector.getMap(row);
  74.             // Object obj = ObjectInspectorUtils.copyToStandardJavaObject(row,paramsElementInspector);
  75.             // 转成标准的java map,否则里面的key value字段为hadoop writable对象
  76.             Map<String, String> data = new HashMap<String,String>();
  77.             for (Map.Entry<?, ?> entry : map.entrySet()) {
  78.                 if (entry.getValue() != null && !"".equals(entry.getValue().toString())) {
  79.                     data.put(entry.getKey().toString(), entry.getValue().toString());
  80.                 }
  81.             }
  82.             JSONObject jsonObject = new JSONObject(data);
  83.             //指定数据均匀写入3个分区中
  84.             int part = i % 2;
  85.             producer.send(new ProducerRecord<String, String>(topics, part,Integer.toString(i), jsonObject.toString()));
  86.         }
  87.         producer.close();
  88.         return new IntWritable(1);
  89.     }
  90.     public String getDisplayString(String[] strings) {
  91.         return "hive2kafka(brokerhost_and_port,topic, array<map<string,string>>)";
  92.     }
  93. }
复制代码
测试SQL
第一个参数:broker所在位置
第二个参数:topic
第三个参数:将多行转成集合,每一行转成一个map

  1. SELECT g,hive2kafka('bd01:9092','bobizli_test',collect_list(map('full_name',full_name,'simple_name',simple_name))) AS result
  2. FROM
  3. (
  4. SELECT r1,pmod(ABS(hash(r1)),1000) AS g,full_name,simple_name
  5. FROM(
  6. SELECT row_number() over(PARTITION BY 1) AS r1,full_name,simple_name
  7. FROM dws_bo_final_spider_contact
  8. LIMIT 10000) tmp
  9. ) tmp2
  10. GROUP BY g;
复制代码





加微信w3aboutyun,可拉入技术爱好者群

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条