分享

彻底明白Flink系统学习18:【Flink1.7】如何在Flink中使用Hadoop MapReduce代码

pig2 2019-1-3 16:20:43 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 16761

问题导读

1.Flink中可以使用Hadoop哪些内容?
2.Hadoop InputFormat在Flink中视为什么?
3.FlatMapFunction与Hadoop哪一个函数功能类似?
4.如何将MapReduce代码转换为Flink代码?


彻底明白Flink系统学习17:【Flink1.7】DataSet 编程之如何读取外部文件
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26562


Flink与Apache Hadoop MapReduce接口兼容,因此允许重用Hadoop MapReduce实现的代码。

Flink可以做以下事情:
在Flink程序中使用Hadoop的Writable数据类型。
使用任何Hadoop InputFormat作为DataSource。
使用任何Hadoop OutputFormat作为DataSink。
使用Hadoop Mapper作为FlatMapFunction。
使用Hadoop Reducer作为GroupReduceFunction。


本文章是关于如何在Flink中使用现有的Hadoop MapReduce代码。

项目配置

在flink-java和flink-scala Maven模块中支持Hadoop输入/输出格式,在编写Flink作业时需要这些模块。 代码位于org.apache.flink.api.java.hadoop和org.apache.flink.api.scala.hadoop中的子包。

flink-hadoop兼容Maven模块中包含对Hadoop Mappers和Reducers的支持。 此代码在org.apache.flink.hadoopcompatibility包中。

如果要重用Mappers和Reducers,添加以下依赖项添加到pom.xml。

[mw_shl_code=scala,true]<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility_2.11</artifactId>
        <version>1.7.0</version>
</dependency>[/mw_shl_code]

使用Hadoop数据类型
Flink支持所有开箱即用的Hadoop Writable和WritableComparable数据类型。 如果只想使用Hadoop数据类型,则不需要包含Hadoop兼容性依赖项。

使用Hadoop 输入格式

要将Hadoop InputFormats与Flink一起使用,必须首先使用HadoopInputs类的readHadoopFile或createHadoopInput来封装格式。前者用于从fileinputformat派生的输入格式,而后者必须用于通用输入格式。生成的输入格式可用于使用ExecutionEnvironmen CreateInput创建数据源。

结果数据集包含元组(2),其中第一个字段是key,第二个字段是从hadoop inputformat取到的值。

下面的示例演示如何使用Hadoop的textinputformat。
[mw_shl_code=scala,true]val env = ExecutionEnvironment.getExecutionEnvironment

val input: DataSet[(LongWritable, Text)] =
  env.createInput(HadoopInputs.readHadoopFile(
                    new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))

// Do something with the data.
[...][/mw_shl_code]

使用Hadoop输出格式
Flink兼容封装 Hadoop OutputFormats,任何继承org.apache.hadoop.mapred.OutputFormat 或则extends org.apache.hadoop.mapreduce.OutputFormat都是支持的。OutputFormat封装期望输入数据是一个包含key和值的元组(2)的数据集。

下面的示例演示如何使用Hadoop的textOutputFormat。
[mw_shl_code=scala,true]// Obtain your result to emit.
val hadoopResult: DataSet[(Text, IntWritable)] = [...]

val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
  new TextOutputFormat[Text, IntWritable],
  new JobConf)

hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))

hadoopResult.output(hadoopOF)[/mw_shl_code]

使用Hadoop Mappers和Reducers

Hadoop Mappers在语义上等同于Flink的FlatMapFunctions,而Hadoop Reducers等同于Flink的GroupReduceFunctions。 Flink为Hadoop MapReduce的Mapper和Reducer接口的实现提供封装,即,可以在常规Flink程序中重用Hadoop Mappers和Reducers。 目前,仅支持Hadoop的mapred API(org.apache.hadoop.mapred)的Mapper和Reduce接口。

封装器取DataSet<Tuple2<KEYIN,VALUEIN>>作为输入和生成一个DataSet <Tuple2 <KEYOUT,VALUEOUT >>作为输出,KEYIN和KEYOUT是keys,VALUEIN和VALUEOUT是Hadoop key-value对的values。对于Reducers,Flink为GroupReduceFunction提供了一个封装器(HadoopReduceCombineFunction),没有Combiner (HadoopReduceFunction)。封装器接受可选的JobConf对象来配置Hadoop Mapper或Reducer。

Flink’s 函数封装器
  • org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction.


可用作常规Flink FlatMapFunctions或GroupReduceFunctions。
以下示例显示如何使用Hadoop Mapper和Reducer函数。
[mw_shl_code=scala,true]// Obtain data to process somehow.
DataSet<Tuple2<Text, LongWritable>> text = [...]

DataSet<Tuple2<Text, LongWritable>> result = text
  // use Hadoop Mapper (Tokenizer) as MapFunction
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    new Tokenizer()
  ))
  .groupBy(0)
  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    new Counter(), new Counter()
  ));[/mw_shl_code]
请注意:Reducer封装器适用于Flink的groupBy()操作定义的组。 它不考虑可能在JobConf中设置的任何自定义分区器,排序或分组比较。

完成Hadoop WordCount示例
以下示例使用Hadoop数据类型,Input-和OutputFormats以及Mapper和Reducer实现的完整WordCount实现。
[mw_shl_code=scala,true]ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
  new HadoopInputFormat<LongWritable, Text>(
    new TextInputFormat(), LongWritable.class, Text.class, job
  );
TextInputFormat.addInputPath(job, new Path(inputPath));

// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);

DataSet<Tuple2<Text, LongWritable>> result = text
  // use Hadoop Mapper (Tokenizer) as MapFunction
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    new Tokenizer()
  ))
  .groupBy(0)
  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    new Counter(), new Counter()
  ));

// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
  new HadoopOutputFormat<Text, IntWritable>(
    new TextOutputFormat<Text, IntWritable>(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF);

// Execute Program
env.execute("Hadoop WordCount");[/mw_shl_code]

最新经典文章,欢迎关注公众号

本帖被以下淘专辑推荐:

已有(3)人评论

跳转到指定楼层
@小猪佩奇 发表于 2019-1-8 15:21:50
楼主能把导的包也发一下吗
回复

使用道具 举报

若无梦何远方 发表于 2019-9-7 09:00:54
长见识了 谢谢
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条