立即注册 登录
About云-梭伦科技 返回首页

arsenduan的个人空间 https://www.aboutyun.com/?1407 [收藏] [复制] [分享] [RSS]

日志

java代码实例parquet.hadoop.parquetwriter

热度 1已有 3042 次阅读2016-8-12 18:08 | java, java, java



例子1
public static void main(String[] args) {

String inputFile = null;
String outputFile = null;
HelpFormatter formatter = new HelpFormatter();
// create Options object
Options options = new Options();

// add t option
options.addOption("i", true, "input avro file");
options.addOption("o", true, "ouptut Parquet file");
CommandLineParser parser = new DefaultParser();
CommandLine cmd;
try {
cmd = parser.parse(options, args);
inputFile = cmd.getOptionValue("i");
if (inputFile == null) {
formatter.printHelp("AvroToParquet", options);
return;
}
outputFile = cmd.getOptionValue("o");
} catch (ParseException exc) {
System.err.println("Problem with command line parameters: " + exc.getMessage());
return;
}

File avroFile = new File(inputFile);

if (!avroFile.exists()) {
System.err.println("Could not open file: " + inputFile);
return;
}
try {

DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> dataFileReader;
dataFileReader = new DataFileReader<GenericRecord>(avroFile, datumReader);
Schema avroSchema = dataFileReader.getSchema();

// choose compression scheme
CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;

// set Parquet file block size and page size values
int blockSize = 256 * 1024 * 1024;
int pageSize = 64 * 1024;

String base = FilenameUtils.removeExtension(avroFile.getAbsolutePath()) + ".parquet";
if(outputFile != null) {
File file = new File(outputFile);
base = file.getAbsolutePath();
}
Path outputPath = new Path("file:///"+base);

// the ParquetWriter object that will consume Avro GenericRecords
ParquetWriter<GenericRecord> parquetWriter;
parquetWriter = new AvroParquetWriter<GenericRecord>(outputPath, avroSchema, compressionCodecName, blockSize, pageSize);
for (GenericRecord record : dataFileReader) {
parquetWriter.write(record);
}
dataFileReader.close();
parquetWriter.close();
} catch (IOException e) {
System.err.println("Caught exception: " + e.getMessage());
}
}
 
例子2

public void toParquet(InputStream inputStream, String outputFile) throws IOException {
        DatumReader<T> datumReader = new GenericDatumReader<>(schema);
        DataFileStream<T> dataFileStream = new DataFileStream<>(inputStream, datumReader);


        // load your Avro schema
        Schema avroSchema = dataFileStream.getSchema();

        // generate the corresponding parquet schema
        MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema);

        // create a WriteSupport object to serialize your Avro objects
        WriteSupport<IndexedRecord> writeSupport = new AvroWriteSupport(parquetSchema, avroSchema);

        // choose compression scheme
        CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;

        // set parquet file block size and page size values
        int blockSize = 256 * 1024 * 1024;
        int pageSize = 64 * 1024;

        // the ParquetWriter object that will consume Avro GenericRecords
//        ParquetWriter parquetWriter = new ParquetWriter(new org.apache.hadoop.fs.Path(outputFile),
//                writeSupport, compressionCodecName, blockSize, pageSize);

        ParquetWriter<IndexedRecord> parquetWriter = new ParquetWriter<>(new org.apache.hadoop.fs.Path(outputFile),
                writeSupport, compressionCodecName, blockSize, pageSize);


        int numRecords = 0;
        T resuse = null;

        while (dataFileStream.hasNext()) {
            resuse = dataFileStream.next(resuse);
            parquetWriter.write(resuse);
            if (numRecords%1000 == 0) {
                System.out.println(numRecords);
            }
            numRecords++;
        }
        parquetWriter.close();

    }


例子3:

public static ParquetWriter<Group> initWriter(String fileName, Map<String, String> metas)
        throws IOException{


    GroupWriteSupport.setSchema(schema, conf);


    ParquetWriter<Group> writer = new ParquetWriter<Group>(
            initFile(fileName),
            new GroupWriteSupport(metas),
            CompressionCodecName.SNAPPY,
            1024,
            1024,
            512,
            true,
            false,
            ParquetProperties.WriterVersion.PARQUET_1_0,
            conf);

    return writer;
}
 



例子4

public static void main(String []args) throws IOException{


        int fileNum = 10;   //num of files constructed
        int fileRecordNum = 50; //record num of each file
        int rowKey = 0;
        for(int i = 0; i < fileNum; ++ i ) {

            Map<String, String> metas = new HashMap<>();
            metas.put(HConstants.START_KEY, genRowKey("%10d", rowKey + 1));
            metas.put(HConstants.END_KEY, genRowKey("%10d", rowKey + fileRecordNum));

            ParquetWriter<Group> writer = initWriter("pfile/scanner_test_file" + i, metas);

            for (int j = 0;  j < fileRecordNum; ++j) {
                rowKey ++;
                Group group = sfg.newGroup().append("rowkey", genRowKey("%10d", rowKey))
                        .append("cf:name", "wangxiaoyi" + rowKey)
                        .append("cf:age", String.format("%10d", rowKey))
                        .append("cf:job", "student")
                        .append("timestamp", System.currentTimeMillis());
                writer.write(group);
            }

            writer.close();
        }
    }
 


例子6


@Override
public void write(T model) throws IOException {
    ParquetWriter<T> writer = prepareWriter();
    writer.write(model);

    // not sure
    counter.add(1);
}
 




例子8

@SuppressWarnings("unchecked")
private ParquetWriter<T> prepareWriter() throws IOException {
    ParquetWriter<T> writer = currentWriter;
    if (writer == null) {
        if (LOG.isInfoEnabled()) {
            LOG.info(MessageFormat.format(
                    Messages.getString("ParquetFileOutput.infoCreate"), //$NON-NLS-1$
                    descriptor.getDataModelClass().getSimpleName(),
                    path));
        }
        Options opts = options;
        writer = LIBRARY_VERSION.newInstance(
                path,
                (WriteSupport<T>) writeSupport,
                opts,
                configuration);
        currentWriter = writer;
    }
    return writer;
}

例子9:
@JsonCreator
public OutputStreamParquet(@JsonProperty("schema") JsonNode nodeSchema,
                           @JsonProperty("path") String path) throws IOException {
    if (nodeSchema.hasNonNull("_optional-strings")) {
        ArrayNode fields = (ArrayNode) nodeSchema.get("fields");
        ArrayNode optionalStrings = (ArrayNode) nodeSchema.get("_optional-strings");
        Iterator<JsonNode> optionalStringIterator = optionalStrings.elements();
        while (optionalStringIterator.hasNext()) {
            String optionalString = optionalStringIterator.next().asText();
            ObjectNode wrapper = ((ObjectNode) nodeSchema).objectNode();
            ArrayNode unionType = wrapper.arrayNode();
            unionType.add("null");
            unionType.add("string");
            wrapper.put("name", optionalString);
            wrapper.set("type", unionType);
            fields.add(wrapper);
        }
    }
    String schema = nodeSchema.toString();
    outputSchema = new Schema.Parser().parse(schema);
    parquetWriter = new AvroParquetWriter<>(
            new Path(path), outputSchema, compressionCodecName,
            ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE);
}


例子10:
/**
 * Write the file.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.IOFileOpts.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  File inputFile = new File(cli.getArgValueAsString(CliCommonOpts.IOFileOpts.INPUT));
  Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.IOFileOpts.OUTPUT));

  AvroParquetWriter<Stock> writer =
      new AvroParquetWriter<Stock>(outputPath, Stock.SCHEMA$,
          CompressionCodecName.SNAPPY,
          ParquetWriter.DEFAULT_BLOCK_SIZE,
          ParquetWriter.DEFAULT_PAGE_SIZE,
          true);

  for (Stock stock : AvroStockUtils.fromCsvFile(inputFile)) {
    writer.write(stock);
  }

  writer.close();

  return 0;
}


路过

雷人

握手

鲜花

鸡蛋

发表评论 评论 (1 个评论)

回复 ljlinux2012 2017-3-2 00:15
dddddd

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条