分享

Flink sql client如何实现远程提交Yarn

批注 2020-09-10 170622.png


链接: https://pan.baidu.com/s/1mYzfF1ptNJzPAMqzMqCRaQ 提取码: jvwe



flink sql client

基于flink-1.9.1,支持flink在不同集群模式下的sql任务提交,区别于flink官方提供的sql client模块,官方提供的是需要单独执行的服务,这里提供的是一个sdk,可以无缝接入自己的流计算平台.

功能支持


批注 2020-09-10 170657.png

quick start


standalone模式

  • 启动flink standalone集群,rest端口为8081
  • 运行com.github.mxb.flink.sql.cluster.StandaloneClusterTest中的kafkaToMysqlTest测试用例

Test用例

  • com.github.mxb.flink.sql.cluster.StandaloneClusterTest.kafkaToMysqlTest测试用例

○ 完整SQL

kafkaToMysql1.png

○ 执行StandaloneClusterTest中的kafkaToMysqlTest测试用例

kafkaToMysql2.png

○ flink-standalone集群job信息

kafkaToMysql3.png

flink on yarn模式

  • 启动flink on yarn集群,获得applicationId
  • 运行com.github.mxb.flink.sql.cluster.YarnClusterClientTest中的kafkaToMysqlTest测试用例

本地Minicluster模式

  • 运行com.github.mxb.flink.sql.local.LocalClusterClientTest里面的测试用例(可用于本地调试分布式任务)

sdk quick start


  • 项目引入flink-sql-client
  1. <dependency>
  2.     <groupId>com.github.mxb</groupId>
  3.     <artifactId>flink-sql-client</artifactId>
  4.     <version>1.0-SNAPSHOT</version>
  5. </dependency>
复制代码

  • 实例化取出clusterClient并执行SQL job
  1. public class Test{
  2.     public ClusterClient getClusterClient(){
  3.         FlinkResourceInfo standaloneResourceInfo = new FlinkResourceInfo();
  4.         standaloneResourceInfo.setResourceType(ResourceType.STANDALONE);
  5.         ClusterDescriptor clusterDescriptor = ClusterDescriptorFactory.createClusterDescriptor(standaloneResourceInfo);
  6.         StandAloneClusterId standAloneClusterId = new StandAloneClusterId("127.0.0.1", 8081);
  7.         ClusterClient clusterClient = clusterDescriptor.retrieve(standAloneClusterId);
  8.     }
  9.     public String executeSqlJob(String sql, List<File> dependencyJars){
  10.         ClusterClient clusterClient = getClusterClient();
  11.         JobRunConfig jobRunConfig = JobRunConfig.builder()
  12.                         .jobName(getTestJobName())
  13.                         .defaultParallelism(1)
  14.                         .sourceParallelism(1)
  15.                         .checkpointInterval(60_000L).build();
  16.         ProgramTargetDescriptor programTargetDescriptor = clusterClient.executeSqlJob(jobRunConfig, dependencyJars, sql);
  17.         return programTargetDescriptor.getJobId();
  18.     }
  19. }
复制代码

问题处理


执行groupByTest用例时会出现InvalidClassException异常,local class incompatible serialVersionUID;解决:在flink-parent中修改对应的类并重新引入
  1. @SerialVersionUID(value = 1)
  2. abstract class ProcessFunctionWithCleanupState
复制代码


文章作者:molsionmo
文章来源:https://github.com/molsionmo/flink-sql-client





没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条