分享

spring-batch+spring-hadoop+olh整合

pig2 发表于 2015-3-23 13:57:04 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 49801

问题导读
1.本文是如何用spring-hadoop和spring-batch对MR整合的?
2.mvn install:install-file -DgroupId=com.oracle -DartifactID=xx -Dversion=2.1.0 -Dpackaging=jar -Dfile=xx.jar的作用是什么?







参加大会回来,讲师们的一个观点让我很受启发:就是淘宝和百度的Hadoop工程师们,会写10几个步骤的连续的MR作业来做运算,也不会去写低效 hive。在我的实际工作中,最多有四步的MR运算,代码已经比较混乱了,并不易于维护。这两天用spring-hadoop和spring-batch对MR整合,形成工作流,代码逻辑清晰很多,以后维护也方便了

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.         xmlns:hdp="http://www.springframework.org/schema/hadoop"
  5.         xmlns:context="http://www.springframework.org/schema/context"
  6.         xmlns:batch="http://www.springframework.org/schema/batch"
  7.         xmlns:p="http://www.springframework.org/schema/p"
  8.         xsi:schemaLocation="http://www.springframework.org/schema/beans
  9.                 http://www.springframework.org/schema/beans/spring-beans.xsd
  10.         http://www.springframework.org/schema/context
  11.         http://www.springframework.org/schema/context/spring-context.xsd
  12.         http://www.springframework.org/schema/hadoop
  13.         http://www.springframework.org/schema/hadoop/spring-hadoop.xsd
  14.         http://www.springframework.org/schema/batch
  15.         http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">
  16.         <hdp:configuration>
  17.                 fs.default.name=${fs.default.name}
  18.         </hdp:configuration>
  19.         
  20.         <context:property-placeholder location="classpath:etlHadoop.properties" />
  21.         
  22.         <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />
  23.         <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
  24.         <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher" p:jobRepository-ref="jobRepository" />
  25.         
  26.         <bean id="etlDriver" class="com.etl.EtlChainDriver">
  27.         </bean>
  28.         
  29.         <bean id="loadJob" class="oracle.hadoop.loader.OraLoader">
  30.         </bean>        
  31.         
  32.         <bean id="etlInitStepListener" class="com.etl.listener.EtlInitStepListener"/>
  33.         <bean id="etlJobListener" class="com.etl.listener.EtlJobListener"/>
  34.         
  35.         <batch:job id="etlJob">
  36.                 <batch:step id="initjs">
  37.                         <hdp:script-tasklet id="script-tasklet">
  38.                                 <hdp:script language="javascript">
  39.                                         importPackage(java.util);
  40.                                         importPackage(java.lang);
  41.                                        
  42.                                         name = UUID.randomUUID().toString()
  43.                                         System.out.println(name)
  44.                                 </hdp:script>
  45.                         </hdp:script-tasklet>
  46.                         <batch:next on="COMPLETED" to="etl" />
  47.                         <batch:listeners>
  48.                                 <batch:listener ref="etlInitStepListener"></batch:listener>
  49.                         </batch:listeners>
  50.                 </batch:step>
  51.                
  52.                 <batch:step id="etl">
  53.                         <!-- spring batch设置scope为step,表示生成step的时候才创建bean,因为这个时候jobParameters才传过来 -->
  54.                         <hdp:tool-tasklet id="tool-tasklet" tool-ref="etlDriver" scope="step">
  55.                                 <hdp:arg value="#{jobParameters['etlinput']}" />
  56.                                 <hdp:arg value="#{jobParameters['etloutput']}" />
  57.                         </hdp:tool-tasklet>
  58.                 </batch:step>
  59.                 <batch:listeners>
  60.                            <batch:listener ref="etlJobListener"/>
  61.                   </batch:listeners>
  62.         </batch:job>
  63.         
  64.         <batch:job id="loadToOracle">
  65.                 <batch:step id="loadStep">
  66.                         <hdp:tool-tasklet id="jar-tasklet" tool-ref="loadJob" scope="step">
  67.                                 <!-- properties -->
  68.                                 mapred.input.dir=#{jobParameters['mapred.input.dir']}
  69.                                 mapred.output.dir=#{jobParameters['mapred.output.dir']}
  70.                                 mapreduce.outputformat.class=#{jobParameters['mapreduce.outputformat.class']}
  71.                                 oracle.hadoop.loader.loaderMapFile=#{jobParameters['oracle.hadoop.loader.loaderMapFile']}
  72.                                 mapreduce.inputformat.class=#{jobParameters['mapreduce.inputformat.class']}
  73.                                 mapreduce.outputformat.class=#{jobParameters['mapreduce.outputformat.class']}
  74.                                 <!-- 这里必须硬编码\u0009(表示制表符),否则运行作业报没有fieldTerminator -->
  75.                                 oracle.hadoop.loader.input.fieldTerminator=\u0009
  76.                                 oracle.hadoop.loader.olhcachePath=#{jobParameters['oracle.hadoop.loader.olhcachePath']}
  77.                                 oracle.hadoop.loader.loadByPartition=#{jobParameters['oracle.hadoop.loader.loadByPartition']}
  78.                                 oracle.hadoop.loader.connection.url=#{jobParameters['oracle.hadoop.loader.connection.url']}
  79.                                 oracle.hadoop.loader.connection.user=#{jobParameters['oracle.hadoop.loader.connection.user']}
  80.                                 oracle.hadoop.loader.connection.password=#{jobParameters['oracle.hadoop.loader.connection.password']}
  81.                         </hdp:tool-tasklet>
  82.                 </batch:step>
  83.         </batch:job>
  84. </beans>
复制代码
总的来说是两大步骤,先对原始日志进行ETL,然后用OLH将洗过的数据导入到oracle中,注意ETL中分为两步:

先用脚本做一些初始化工作,我这里随便写了点代码,实际中可以拷贝数据,删除已存在的output目录等等;

第二步是真正的ETL执行。通过定义StepExecutionListener和JobExecutionListener,我们可以在任务完成时进行回调操作,完成一些逻辑处理,在我的例子中就是com.etl.listener.EtlInitStepListener和com.etl.listener.EtlJobListener。所有的JobParameters都在java代码中赋值,除了fieldTerminator,这在实际应用中具有很大的灵活性。

项目采用maven管理,pom如下,

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3.         <modelVersion>4.0.0</modelVersion>
  4.         <groupId>com</groupId>
  5.         <artifactId>etl</artifactId>
  6.         <version>0.0.1-SNAPSHOT</version>
  7.         <packaging>jar</packaging>
  8.         <name>etl</name>
  9.         <properties>
  10.                 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  11.                 <hadoop.version>1.1.2</hadoop.version>
  12.                 <pig.version>0.11.1</pig.version>
  13.                 <hive.version>0.10.0</hive.version>
  14.                 <hbase.version>0.94.7</hbase.version>
  15.                 <zookeeper.version>3.4.5</zookeeper.version>
  16.                 <spring.hadoop.version>1.0.0.RELEASE</spring.hadoop.version>
  17.                 <spring.batch.version>2.2.0.RELEASE</spring.batch.version>
  18.                 <odch.version>2.1.0</odch.version>
  19.         </properties>
  20.         <dependencies>
  21.                 <dependency>
  22.                         <groupId>junit</groupId>
  23.                         <artifactId>junit</artifactId>
  24.                         <version>4.11</version>
  25.                         <scope>test</scope>
  26.                 </dependency>
  27.                 <dependency>
  28.                         <groupId>joda-time</groupId>
  29.                         <artifactId>joda-time</artifactId>
  30.                         <version>2.2</version>
  31.                 </dependency>
  32.                 <dependency>
  33.                         <groupId>org.hamcrest</groupId>
  34.                         <artifactId>hamcrest-all</artifactId>
  35.                         <version>1.3</version>
  36.                         <scope>test</scope>
  37.                 </dependency>
  38.                 <dependency>
  39.                         <groupId>org.apache.hadoop</groupId>
  40.                         <artifactId>hadoop-core</artifactId>
  41.                         <version>${hadoop.version}</version>
  42.                 </dependency>
  43.                 <dependency>
  44.                         <groupId>org.apache.hadoop</groupId>
  45.                         <artifactId>hadoop-tools</artifactId>
  46.                         <version>${hadoop.version}</version>
  47.                 </dependency>
  48.                 <dependency>
  49.                         <groupId>org.apache.mrunit</groupId>
  50.                         <artifactId>mrunit</artifactId>
  51.                         <version>0.9.0-incubating</version>
  52.                         <classifier>hadoop1</classifier>
  53.                 </dependency>
  54.                 <dependency>
  55.                         <groupId>org.apache.pig</groupId>
  56.                         <artifactId>pig</artifactId>
  57.                         <version>${pig.version}</version>
  58.                 </dependency>
  59.                 <dependency>
  60.                         <groupId>org.apache.hbase</groupId>
  61.                         <artifactId>hbase</artifactId>
  62.                         <version>${hbase.version}</version>
  63.                 </dependency>
  64.                 <dependency>
  65.                         <groupId>org.apache.hive</groupId>
  66.                         <artifactId>hive-exec</artifactId>
  67.                         <version>${hive.version}</version>
  68.                 </dependency>
  69.                 <dependency>
  70.                         <groupId>javax.jdo</groupId>
  71.                         <artifactId>jdo2-api</artifactId>
  72.                         <version>2.3-eb</version>
  73.                 </dependency>
  74.                 <dependency>
  75.                         <groupId>org.apache.zookeeper</groupId>
  76.                         <artifactId>zookeeper</artifactId>
  77.                         <version>${zookeeper.version}</version>
  78.                 </dependency>
  79.                 <dependency>
  80.                         <groupId>org.springframework.data</groupId>
  81.                         <artifactId>spring-data-hadoop</artifactId>
  82.                         <version>${spring.hadoop.version}</version>
  83.                 </dependency>
  84.                 <dependency>
  85.                         <groupId>org.springframework.batch</groupId>
  86.                         <artifactId>spring-batch-core</artifactId>
  87.                         <version>${spring.batch.version}</version>
  88.                 </dependency>
  89.                 <dependency>
  90.                         <groupId>org.springframework</groupId>
  91.                         <artifactId>spring-context-support</artifactId>
  92.                         <version>3.2.0.RELEASE</version>
  93.                 </dependency>
  94.                 <!-- odch -->
  95.                 <dependency>
  96.                         <groupId>com.oracle</groupId>
  97.                         <artifactId>ojdbc6</artifactId>
  98.                         <version>10.2.0.2.0</version>
  99.                 </dependency>
  100.                 <dependency>
  101.                         <groupId>com.oracle</groupId>
  102.                         <artifactId>oraloader</artifactId>
  103.                         <version>${odch.version}</version>
  104.                 </dependency>
  105.                 <dependency>
  106.                         <groupId>com.oracle</groupId>
  107.                         <artifactId>orai18n</artifactId>
  108.                         <version>${odch.version}</version>
  109.                 </dependency>
  110.                 <dependency>
  111.                         <groupId>com.oracle</groupId>
  112.                         <artifactId>orai18n-collation</artifactId>
  113.                         <version>${odch.version}</version>
  114.                 </dependency>
  115.                 <dependency>
  116.                         <groupId>com.oracle</groupId>
  117.                         <artifactId>orai18n-mapping</artifactId>
  118.                         <version>${odch.version}</version>
  119.                 </dependency>
  120.                 <dependency>
  121.                         <groupId>com.oracle</groupId>
  122.                         <artifactId>orai18n-utility</artifactId>
  123.                         <version>${odch.version}</version>
  124.                 </dependency>
  125.                 <dependency>
  126.                         <groupId>com.oracle</groupId>
  127.                         <artifactId>ora-hadoop-common</artifactId>
  128.                         <version>${odch.version}</version>
  129.                 </dependency>
  130.                 <dependency>
  131.                         <groupId>com.oracle</groupId>
  132.                         <artifactId>osdt_core</artifactId>
  133.                         <version>${odch.version}</version>
  134.                 </dependency>
  135.                 <dependency>
  136.                         <groupId>com.oracle</groupId>
  137.                         <artifactId>osdt_cert</artifactId>
  138.                         <version>${odch.version}</version>
  139.                 </dependency>
  140.                 <dependency>
  141.                         <groupId>com.oracle</groupId>
  142.                         <artifactId>oraclepki</artifactId>
  143.                         <version>${odch.version}</version>
  144.                 </dependency>
  145.                 <dependency>
  146.                         <groupId>org.apache.avro</groupId>
  147.                         <artifactId>avro</artifactId>
  148.                         <version>1.6.3</version>
  149.                 </dependency>
  150.                 <dependency>
  151.                         <groupId>org.apache.avro</groupId>
  152.                         <artifactId>avro-mapred</artifactId>
  153.                         <version>1.6.3</version>
  154.                 </dependency>
  155.         </dependencies>
  156.     <build>
  157.         <finalName>etl</finalName>
  158.         <plugins>
  159.             <plugin>
  160.                 <groupId>org.apache.maven.plugins</groupId>
  161.                 <artifactId>maven-compiler-plugin</artifactId>
  162.                 <version>2.3.2</version>
  163.                 <configuration>
  164.                     <source>1.6</source>
  165.                     <target>1.6</target>
  166.                 </configuration>
  167.             </plugin>
  168.             <plugin>
  169.                 <groupId>org.codehaus.mojo</groupId>
  170.                 <artifactId>appassembler-maven-plugin</artifactId>
  171.                 <version>1.3.1</version>
  172.                 <configuration>
  173.                         <repositoryLayout>flat</repositoryLayout>
  174.                             <programs>
  175.                                     <program>
  176.                                             <mainClass>com.etl.Main</mainClass>
  177.                                             <name>etl</name>
  178.                                     </program>
  179.                             </programs>
  180.                 </configuration>
  181.             </plugin>
  182.         </plugins>
  183.     </build>
  184. </project>
复制代码



注意oracle的东西都不是开源的,所以都不在maven中央仓库中,执行mvn install:install-file -DgroupId=com.oracle -DartifactID=xx -Dversion=2.1.0 -Dpackaging=jar -Dfile=xx.jar装载到本地仓库中,需要装载的jar包有ojdbc6.jar(注意不能是classes.jar),oraloader.jar,orai18n.jar等(即在$OLH_HOME/jlib目录下的jar文件)。

打包及运行:mvn clean package appassembler:assemble
chmod u+x target/appassembler/bin/etl
./target/appassembler/bin/etl




已有(2)人评论

跳转到指定楼层
落魂草 发表于 2015-3-23 19:41:37
回复

使用道具 举报

tempmail 发表于 2015-3-23 23:54:55


很好很强大
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条