分享

重磅!Flink源码解析环境准备及提交流程之环境准备


问题导读

1.程序的起点包含哪些内容?
2.程序入口类是哪个?
3.选择创建哪种类型的客户端?
4.客户端有哪种类型?



Flink Yarn-per-job模式提交流程如图所示:


1.png

1.程序起点



1. flink\bin\flink

=> exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}"-classpath "`manglePathList"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

2. flink/bin/config.sh(相关环境配置都在这里)

       => JAVA_RUN=java

       => JVM_ARGS=""      => # Use conf/flink-conf.yaml

=>INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"

3. 执行java -cp 就会开启JVM虚拟机,在虚拟机上开启CliFrontend进程,然后开始执行main方法

说明:java -cp和 -classpath一样,是指定类运行所依赖其他类的路径。

       java -cp =》开启JVM虚拟机 =》开启Process(CliFrontend)=》程序入口CliFrontend.main

4. Flink提交任务的入口类为CliFrontend。找到这个类的main方法:

在IDEA中全局查找(ctrl + n):org.apache.flink.client.cli.CliFrontend,找到CliFrontend类,并找到main方法


  1. /**
  2. * Submits the job based on the arguments.(根据参数提交作业)
  3. */
  4. public static void main(final String[] args) {
  5.        ... ...
  6.        final CliFrontend cli = new CliFrontend(
  7.                             configuration,
  8.                             customCommandLines);
  9.        ... ...
  10. }
复制代码



2.程序入口

CliFrontend.java
  1. public static void main(final String[] args) {
  2.          ... ...
  3.          final CliFrontend cli = new CliFrontend(
  4.                                    configuration,
  5.                                    customCommandLines);
  6.          ... ...
  7.         int retCode =SecurityUtils.getInstalledContext()
  8.                             .runSecured(() -> cli.parseParameters(args));
  9.          ... ...
  10. }
复制代码



  1. public int parseParameters(String[] args) {
  2.          ... ...
  3.          // get action
  4.          String action = args[0];
  5.          // remove action from parameters
  6.          final String[] params =Arrays.copyOfRange(args, 1, args.length);
  7.         
  8.                   // do action
  9.                   switch (action) {
  10.                           case ACTION_RUN:
  11.                                    run(params);
  12.                                    return 0;
  13.                           case ACTION_LIST:
  14.                                    list(params);
  15.                                    return 0;
  16.                           case ACTION_INFO:
  17.                                    info(params);
  18.                                    return 0;
  19.                           case ACTION_CANCEL:
  20.                                    cancel(params);
  21.                                    return 0;
  22.                           case ACTION_STOP:
  23.                                    stop(params);
  24.                                    return 0;
  25.                           case ACTION_SAVEPOINT:
  26.                                    savepoint(params);
  27.                                    return 0;
  28.                           ……
  29.                   }
  30.          ... ...
  31. }
复制代码


3.解析输入参数

CliFrontend.java
  1. protected void run(String[] args) throws Exception {
  2.        ... ...
  3.        //获取默认的运行参数
  4.        final Options commandOptions =CliFrontendParser.getRunCommandOptions();
  5.          // 解析参数,返回commandLine
  6.        final CommandLine commandLine = getCommandLine(commandOptions, args, true);
  7.        ... ...
  8. }
复制代码


  1. public CommandLine getCommandLine(final OptionscommandOptions, final String[] args, final boolean stopAtNonOptions) throwsCliArgsException {
  2.          final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions,customCommandLineOptions);
  3.          return CliFrontendParser.parse(commandLineOptions, args,stopAtNonOptions);
  4. }
复制代码


DefaultParser.java
  1. public class CliFrontendParser {
  2.          // 选项列表
  3.          static final Option HELP_OPTION = new Option("h", "help", false,
  4.                           "Show the helpmessage for the CLI Frontend or the action.");
  5.          static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
  6.          static final Option CLASS_OPTION = new Option("c", "class", true,
  7.                           "Class with theprogram entry point ("main()" method). Only needed if the " +
  8.                           "JAR file doesnot specify the class in its manifest.");
  9. ... ...
  10. }
复制代码



DefaultParser.java
  1. public CommandLine parse(Options options, String[] arguments,Properties properties, boolean stopAtNonOption)
  2.             throws ParseException
  3. {
  4.     ... ...
  5.     if (arguments != null)
  6.     {
  7.         for (String argument : arguments)
  8.         {
  9.             handleToken(argument);
  10.         }
  11.     }
  12. ... ...
  13. }
复制代码


  1. private void handleToken(String token) throwsParseException
  2. {
  3.     currentToken = token;
  4.     if (skipParsing)
  5.     {
  6.         cmd.addArg(token);
  7.     }
  8.     else if ("--".equals(token))
  9.     {
  10.         skipParsing = true;
  11.     }
  12.     else if (currentOption != null &¤tOption.acceptsArg() && isArgument(token))
  13.     {
  14.         currentOption.addValueForProcessing(Util.stripLeadingAndTrailingQuotes(token));
  15.     }
  16.     else if (token.startsWith("--"))
  17. {
  18.          // 解析--形式的参数
  19.         handleLongOption(token);
  20.     }
  21.     else if (token.startsWith("-") && !"-".equals(token))
  22. {
  23.          // 解析 -形式的参数
  24.         handleShortAndLongOption(token);
  25.     }
  26.     else
  27.     {
  28.         handleUnknownToken(token);
  29.     }
  30.     if (currentOption != null && !currentOption.acceptsArg())
  31.     {
  32.         currentOption = null;
  33.     }
  34. }
复制代码

  1. private void handleLongOption(String token) throwsParseException
  2. {
  3.     if (token.indexOf('=') == -1)
  4. {
  5.          //解析–L、-L、--l、-l形式的参数(不包含=)
  6.         handleLongOptionWithoutEqual(token);
  7.     }
  8.     else
  9. {
  10.          // 解析--L=V、-L=V、--l=V、-l=V形式的参数(包含=)
  11.         handleLongOptionWithEqual(token);
  12.     }
  13. }
复制代码


各种情况的解析,逻辑大体相同:去除-或--前缀,校验参数,以其中一个为例
  1. private void handleLongOptionWithoutEqual(String token)throws ParseException
  2. {
  3.          // 校验参数是否合法
  4.     List<String> matchingOpts = options.getMatchingOptions(token);
  5.     if (matchingOpts.isEmpty())
  6.     {
  7.         handleUnknownToken(currentToken);
  8.     }
  9.     else if (matchingOpts.size() > 1)
  10.     {
  11.         throw newAmbiguousOptionException(token, matchingOpts);
  12.     }
  13.     else
  14. {
  15. // 参数添加到执行命令
  16.         handleOption(options.getOption(matchingOpts.get(0)));
  17.     }
  18. }
复制代码



Options.java:
  1. public List<String> getMatchingOptions(String opt)
  2. {
  3.          // 去除 -或 -- 前缀
  4.     opt = Util.stripLeadingHyphens(opt);
  5.    
  6.     List<String> matchingOpts = newArrayList<String>();
  7.     // for a perfect match return the singleoption only
  8.     if (longOpts.keySet().contains(opt))
  9.     {
  10.         return Collections.singletonList(opt);
  11.     }
  12.     for (String longOpt : longOpts.keySet())
  13.     {
  14.         if (longOpt.startsWith(opt))
  15.         {
  16.             matchingOpts.add(longOpt);
  17.         }
  18.     }
  19.    
  20.     return matchingOpts;
  21. }
复制代码



DefaultParser.java
  1. private void handleOption(Option option) throwsParseException
  2. {
  3.     // check the previous option beforehandling the next one
  4.     checkRequiredArgs();
  5.     option = (Option) option.clone();
  6.     updateRequiredOptions(option);
  7.     cmd.addOption(option);
  8.     if (option.hasArg())
  9.     {
  10.         currentOption = option;
  11.     }
  12.     else
  13.     {
  14.         currentOption = null;
  15.     }
  16. }
复制代码



4.选择创建哪种类型的客户端

CliFrontend.java
  1. public static void main(final String[] args) {
  2.          ... ...
  3.          final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
  4.                           configuration,
  5.                           configurationDirectory);
  6.          ... ...
  7.          final CliFrontend cli = new CliFrontend(
  8.                                    configuration,
  9.                                    customCommandLines);
  10.          ... ...
  11. }
复制代码



这里依次添加了 Yarn和Default(Standalone)两种客户端(后面根据isActive()选择):
  1. public static List<CustomCommandLine> loadCustomCommandLines(Configurationconfiguration, String configurationDirectory) {
  2.          List<CustomCommandLine>customCommandLines = new ArrayList<>();
  3.          customCommandLines.add(newGenericCLI(configuration, configurationDirectory));
  4.          //       Commandline interface of the YARN session, with a special initialization here
  5.          //       toprefix all options with y/yarn.
  6.          final String flinkYarnSessionCLI ="org.apache.flink.yarn.cli.FlinkYarnSessionCli";
  7.          try {
  8.                   customCommandLines.add(
  9.                           loadCustomCommandLine(flinkYarnSessionCLI,
  10.                                    configuration,
  11.                                    configurationDirectory,
  12.                                    "y",
  13.                                    "yarn"));
  14.          } catch (NoClassDefFoundError |Exception e) {
  15.                   final StringerrorYarnSessionCLI ="org.apache.flink.yarn.cli.FallbackYarnSessionCli";
  16.                   try {
  17.                           LOG.info("LoadingFallbackYarnSessionCli");
  18.                           customCommandLines.add(
  19.                                             loadCustomCommandLine(errorYarnSessionCLI,configuration));
  20.                   } catch (Exception exception){
  21.                           LOG.warn("Couldnot load CLI class {}.", flinkYarnSessionCLI, e);
  22.                   }
  23.          }
  24.          //       Tips:DefaultCLI must be added at last, because getActiveCustomCommandLine(..) willget the
  25.          //             active CustomCommandLine in order andDefaultCLI isActive always return true.
  26.          customCommandLines.add(new DefaultCLI(configuration));
  27.          return customCommandLines;
  28. }
复制代码



在run()里面,进行客户端的选择:
  1. protected void run(String[] args) throws Exception {
  2.        ... ...
  3.        final CustomCommandLine activeCommandLine =
  4.                                    validateAndGetActiveCommandLine(checkNotNull(commandLine));
  5. ... ...
  6. }
复制代码


  1. public CustomCommandLine validateAndGetActiveCommandLine(CommandLinecommandLine) {
  2. ... ...
  3.          for (CustomCommandLine cli :customCommandLines) {
  4.          ... ...
  5.          //在FlinkYarnSessionCli为active时优先返回FlinkYarnSessionCli。
  6.                   //对于DefaultCli,它的isActive方法总是返回true。
  7.                   if (cli.isActive(commandLine)) {
  8.                           return cli;
  9.                   }
  10.          }
  11. ... ...
  12. }
复制代码


FlinkYarnSessionCli.java => Yarn客户端isActive的判断逻辑:
  1. public boolean isActive(CommandLine commandLine) {
  2.          final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(),null);
  3.          //是否指定为per-job模式,即指定”-m yarn-cluster”; ID = "yarn-cluster"
  4.          final boolean yarnJobManager =ID.equals(jobManagerOption);
  5.          // 是否存在flink在yarn的appID,即yarn-session模式是否启动
  6.          final boolean hasYarnAppId =commandLine.hasOption(applicationId.getOpt())
  7.                           ||configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
  8.          // executor的名字为"yarn-session" 或 "yarn-per-job"
  9.          final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equals(configuration.get(DeploymentOptions.TARGET))
  10.                          ||YarnJobClusterExecutor.NAME.equals(configuration.get(DeploymentOptions.TARGET));
  11.          //
  12.         return hasYarnExecutor || yarnJobManager|| hasYarnAppId || (isYarnPropertiesFileMode(commandLine) &&yarnApplicationIdFromYarnProperties != null);
  13. }
复制代码



5.获取有效配置
CliFrontend.java
  1. protected void run(String[] args) throws Exception {
  2.        ... ...
  3.        final Configuration effectiveConfiguration = getEffectiveConfiguration(
  4.                                    activeCommandLine,commandLine, programOptions, jobJars);
  5. ... ...
  6. }
复制代码


  1. private Configuration getEffectiveConfiguration(
  2.                   final CommandLine commandLine,
  3.                   final ProgramOptionsprogramOptions,
  4.                   final List<URL> jobJars)throws FlinkException {
  5. ... ...
  6.          final Configuration executorConfig =checkNotNull(activeCustomCommandLine)
  7.                                    .applyCommandLineOptionsToConfiguration(commandLine);
  8. ... ...
  9. }
复制代码


FlinkYarnSessionCli.java
  1. public Configuration applyCommandLineOptionsToConfiguration(CommandLinecommandLine) throws FlinkException {
  2.          // we ignore the addressOption becauseit can only contain "yarn-cluster"
  3.          final ConfigurationeffectiveConfiguration = new Configuration(configuration);
  4.          applyDescriptorOptionToConfig(commandLine,effectiveConfiguration);
  5.          final ApplicationId applicationId =getApplicationId(commandLine);
  6.          if (applicationId != null) {
  7.                   final StringzooKeeperNamespace;
  8.                   if (commandLine.hasOption(zookeeperNamespace.getOpt())){
  9.                           zooKeeperNamespace =commandLine.getOptionValue(zookeeperNamespace.getOpt());
  10.                   } else {
  11.                           zooKeeperNamespace =effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());
  12.                   }
  13.                   effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
  14.                   effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID,ConverterUtils.toString(applicationId));
  15.                   // TARGET 就是execution.target,目标执行器
  16.                   //决定后面什么类型的执行器提交任务:yarn-session、yarn-per-job
  17.                   effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
  18.          } else {
  19.                  effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
  20.          }
  21.          if (commandLine.hasOption(jmMemory.getOpt())) {
  22.                   String jmMemoryVal =commandLine.getOptionValue(jmMemory.getOpt());
  23.                   if(!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) {
  24.                           jmMemoryVal +="m";
  25.                   }
  26.                   effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY,jmMemoryVal);
  27.          }
  28.          if (commandLine.hasOption(tmMemory.getOpt())) {
  29.                   String tmMemoryVal =commandLine.getOptionValue(tmMemory.getOpt());
  30.                   if(!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
  31.                           tmMemoryVal +="m";
  32.                   }
  33.                   effectiveConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,MemorySize.parse(tmMemoryVal));
  34.          }
  35.          if (commandLine.hasOption(slots.getOpt())) {
  36.                   effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
  37.          }
  38. ... ...
  39. }
复制代码



6.调用用户代码的main方法
CliFrontend.java
  1. protected void run(String[] args) throws Exception {
  2.        ... ...
  3.        executeProgram(effectiveConfiguration,program);
  4. ... ...
  5. }
复制代码


  1. protected void executeProgram(final Configurationconfiguration, final PackagedProgram program) throws ProgramInvocationException{
  2.          ClientUtils.executeProgram(newDefaultExecutorServiceLoader(), configuration, program, false, false);
  3. }
复制代码


ClientUtils.java
  1. public static void executeProgram(
  2.                   PipelineExecutorServiceLoader executorServiceLoader,
  3.                   Configuration configuration,
  4.                   PackagedProgram program,
  5.                   boolean enforceSingleJobExecution,
  6.                   boolean suppressSysout) throws ProgramInvocationException {
  7.          checkNotNull(executorServiceLoader);
  8.          final ClassLoader userCodeClassLoader =program.getUserCodeClassLoader();
  9.          final ClassLoader contextClassLoader =Thread.currentThread().getContextClassLoader();
  10.          try {
  11.                   //设置当前的classloader为用户代码的classloader
  12.                   Thread.currentThread().setContextClassLoader(userCodeClassLoader);
  13.                   LOG.info("Startingprogram (detached: {})",!configuration.getBoolean(DeploymentOptions.ATTACHED));
  14.          //用户代码中的getExecutionEnvironment会返回该Environment
  15.                   ContextEnvironment.setAsContext(
  16.                           executorServiceLoader,
  17.                           configuration,
  18.                           userCodeClassLoader,
  19.                           enforceSingleJobExecution,
  20.                           suppressSysout);
  21.                   StreamContextEnvironment.setAsContext(
  22.                           executorServiceLoader,
  23.                           configuration,
  24.                           userCodeClassLoader,
  25.                           enforceSingleJobExecution,
  26.                           suppressSysout);
  27.                   try {
  28.                           //调用用户代码的main方法
  29.                           program.invokeInteractiveModeForExecution();
  30.                   } finally {
  31.                           ContextEnvironment.unsetAsContext();
  32.                           StreamContextEnvironment.unsetAsContext();
  33.                   }
  34.          } finally {
  35.                   Thread.currentThread().setContextClassLoader(contextClassLoader);
  36.          }
  37. }
复制代码



PackagedProgram.java
  1. public void invokeInteractiveModeForExecution() throws ProgramInvocationException{
  2.          callMainMethod(mainClass, args);
  3. }
复制代码


  1. private static void callMainMethod(Class<?> entryClass,String[] args) throws ProgramInvocationException {
  2.          ... ...
  3.          mainMethod = entryClass.getMethod("main", String[].class);
  4.          ... ...
  5.        //反射调用main函数
  6.          mainMethod.invoke(null, (Object) args);
  7.          ... ...
  8. }
复制代码


7.调用执行环境的execute方法
StreamExecutionEnvironment.java
  1. public JobExecutionResult execute() throwsException {
  2.          return execute(DEFAULT_JOB_NAME);
  3. }
复制代码


  1. public JobExecutionResult execute(String jobName)throws Exception {
  2.          ... ...
  3.          return execute(getStreamGraph(jobName));
  4. }
复制代码


  1. public JobExecutionResult execute(StreamGraphstreamGraph) throws Exception {
  2.          final JobClient jobClient = executeAsync(streamGraph);
  3.          ... ...
  4. }
复制代码

  1. public JobClient executeAsync(StreamGraph streamGraph)throws Exception {
  2.          ... ...
  3.          //根据提交模式选择匹配的factory
  4.          final PipelineExecutorFactoryexecutorFactory =
  5.                   executorServiceLoader.getExecutorFactory(configuration);
  6. ... ...
  7.          //选择合适的executor提交任务
  8.          CompletableFuture<JobClient>jobClientFuture = executorFactory
  9.                   .getExecutor(configuration)
  10.                   .execute(streamGraph, configuration);
  11. ... ...
  12. }
复制代码



最新经典文章,欢迎关注公众号



原文链接
https://mp.weixin.qq.com/s/hbwvvZBKnzto5Rb89xqcog

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条