分享

生成HFile文件报错

今天执行生成HFile代码时报java.lang.ClassNotFoundException: Class com.it.hbase.IteblogBulkLoadDriver$HFileImportMapper2 not found 类找不到的问题。但是代码中有这个类。所以就搞不懂了。代码以前应该执行过,现在新搭建的环境不知道什么问题。我看后台进程服务都有启。
代码:

public class GenerateHFileMain {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        final String INPUT_PATH = "hdfs://xxxxx:9000/input";
        final String OUTPUT_PATH = "hdfs://xxxxx:9000/output";

        Configuration conf = HBaseConfiguration.create();
        conf.addResource("core-site.xml");
        conf.addResource("hbase-site.xml");
        conf.addResource("hdfs-site.xml");
        conf.addResource("hive-site.xml");
        conf.addResource("mapred-site.xml");
        conf.addResource("yarn-site.xml");
        conf.addResource("adtec-config.xml");

        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("dws_tfc_state_inter_tp_index_d"));
        Job job = Job.getInstance(conf);
        job.setJarByClass(GenerateHFileMain.class);
        job.setMapperClass(GenerateHFile.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("dws_tfc_state_inter_tp_index_d")));
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    static class GenerateHFile extends Mapper<LongWritable,
            Text, ImmutableBytesWritable, Put> {

        private static Logger log = Logger.getLogger(GenerateHFile.class);

        private long checkpoint = 10000L;//执行过程统计打印百分比进度
        private long count = 0L;//
        private String columns = null;//HBASE表字段配置信息
        private byte[] family = null;//列簇
        private int colLen = -1;//列长度
        private byte[][] cols = null;//列集合
        private boolean rowKeyReverse = false;//ROWKEY反转标识

        /**
         * 随机生成字母数字的混合
         *
         * @param length 生成长度
         * @return 字符串
         */
        public static String randomMix(int length) {
            StringBuilder sb = new StringBuilder();
            Random random = new Random();
            for (int i = 0; i < length; i++) {
                String charOrNum = random.nextInt(2) % 2 == 0 ? "char" : "num";
                if ("char".equalsIgnoreCase(charOrNum)) {
                    int temp = random.nextInt(2) % 2 == 0 ? 65 : 97;
                    sb.append((char) (random.nextInt(26) + temp));
                } else if ("num".equalsIgnoreCase(charOrNum)) {
                    sb.append(String.valueOf(random.nextInt(10)));
                }
            }
            return sb.toString();
        }

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            columns = context.getConfiguration().get("ztesoft.day.columns");
            family = context.getConfiguration().get("ztesoft.day.family").getBytes();

            String colNames[] = columns.split(",", -1);
            colLen = colNames.length;
            cols = new byte[colLen][];

            for (int i = 0; i < colLen; i++) {
                cols = Bytes.toBytes(colNames);
            }

            log.info("@BulkLoadMapper.setup,loadInfo:" + columns);
            log.info("@BulkLoadMapper.setup,family:" + family);
            log.info("@BulkLoadMapper.setup,rowKeyReverse:" + rowKeyReverse);
            log.info("@BulkLoadMapper.setup,colLen:" + colLen);
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           /* String line = value.toString();
            String[] items = line.split("\t");

            String ROWKEY = items[1] + items[2] + items[3];
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(ROWKEY.getBytes());
            Put put = new Put(ROWKEY.getBytes());   //ROWKEY
            put.addColumn("INFO".getBytes(), "URL".getBytes(), items[0].getBytes());
            put.addColumn("INFO".getBytes(), "SP".getBytes(), items[1].getBytes());  //出发点
            put.addColumn("INFO".getBytes(), "EP".getBytes(), items[2].getBytes());  //目的地
            put.addColumn("INFO".getBytes(), "ST".getBytes(), items[3].getBytes());   //出发时间
            put.addColumn("INFO".getBytes(), "PRICE".getBytes(), Bytes.toBytes(Integer.valueOf(items[4])));  //价格
            put.addColumn("INFO".getBytes(), "TRAFFIC".getBytes(), items[5].getBytes());//交通方式
            put.addColumn("INFO".getBytes(), "HOTEL".getBytes(), items[6].getBytes()); //酒店

            context.write(rowkey, put);*/

            String[] vals = value.toString().split(",", -1);//读取一行数据转换成数组

            String suffixRowKey = new StringBuffer(vals[0].trim()).reverse().append(vals[2].trim()).append(vals[1].trim()).toString();
            String rowKey = randomMix(4) + suffixRowKey;
            System.out.println("rowKey : " + rowKey);
            Put row = new Put(Bytes.toBytes(rowKey));//新建PUT
            for (int i = 3; i < vals.length; i++) {
                row.addColumn(family, cols[i - 3], Bytes.toBytes(vals));
            }

            try {
                context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), row); //写文件到HBASE
            } catch (InterruptedException e) {
                log.error("写入HBASE失败!数据:" + value.toString() + e.getMessage(), e);
            }

            System.out.println("==========================");
            if (++count % checkpoint == 0) {
                //显示写入HBASE进度情况
                context.setStatus("Emitting Put " + this.count);
            }
        }
    }
}

有谁知道是什么问题吗

后台进程

后台进程
360截图168201208392114.png

已有(1)人评论

跳转到指定楼层
s060403072 发表于 2019-4-26 19:33:32
那就不是代码的问题。找找其它的原因。比如权限,能否访问到,打包是否有问题等
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条