美烦资源网

专注技术文章分享,涵盖编程教程、IT 资源与前沿资讯

详解flink 1.11中的新部署模式-Application模式


  • 背景
  • per job模式的问题
  • 引入application模式
  • 通过程序提交任务
  • Application模式源码解析

  • 背景

    目前对于flink来说,生产环境一般有两个部署模式,一个是 session模式,一个是per job模式。

    session模式

    这种模式会预先在yarn或者或者k8s上启动一个flink集群,然后将任务提交到这个集群上,这种模式,集群中的任务使用相同的资源,如果某一个任务出现了问题导致整个集群挂掉,那就得重启集群中的所有任务,这样就会给集群造成很大的负面影响。

    per job模式

    考虑到集群的资源隔离情况,一般生产上的任务都会选择per job模式,也就是每个任务启动一个flink集群,各个集群之间独立运行,互不影响,且每个集群可以设置独立的配置。

    per job模式的问题

    目前,对于per job模式,jar包的解析、生成JobGraph是在客户端上执行的,然后将生成的jobgraph提交到集群。很多公司都会有自己的实时计算平台,用户可以使用这些平台提交flink任务,如果任务特别多的话,那么这些生成JobGraph、提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。

    此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应 的临时目录,这个也会带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。

    引入application模式

    所以针对flink per job模式的一些问题,flink 引入了一个新的部署模式--Application模式。 目前 Application 模式支持 Yarn 和 K8s 的部署方式,Yarn Application 模式会在客户端将运行任务需要的依赖都上传到 Flink Master,然后在 Master 端进行任务的提交。

    此外,还支持远程的用户jar包来提交任务,比如可以将jar放到hdfs上,进一步减少上传jar所需的时间,从而减少部署作业的时间。

    具体的使用命令是:

    /bin/flink?run-application?-p?1?-t?yarn-application?\
    -yD?yarn.provided.lib.dirs="hdfs://localhost/flink/libs"?\
    hdfs://localhost/user-jars/HelloWold.jar

    通过程序提交任务

    当我们要做一个实时计算平台的时候,会需要通过程序来提交任务到集群,这时候需要我们自己封装一套API来实现提交flink任务到集群,目前主要的生产环境还是以yarn居多,所以我们今天讲讲怎么通过api的方式把一个任务以application的方法提交到yarn集群。

    • 引入相关的配置到classpath里 core-site.xml hdfs-site.xml yarn-site.xml
    • 定义相关的配置参数
    ??//flink的本地配置目录,为了得到flink的配置
    ??String?configurationDirectory?=?"/Users/user/work/flink/conf/";
    ??//存放flink集群相关的jar包目录
    ??String?flinkLibs?=?"hdfs://hadoopcluster/data/flink/libs";
    ??//用户jar
    ??String?userJarPath?=?"hdfs://hadoopcluster/data/flink/user-lib/TopSpeedWindowing.jar";
    ??String?flinkDistJar?=?"hdfs://hadoopcluster/data/flink/libs/flink-yarn_2.11-1.11.0.jar";
    • 获取flink的配置

    这里其实还可以设置很多的配置参数,比如yarn的队列名字等等,大家根据自己的需要来设置。

    //??获取flink的配置
    ??Configuration?flinkConfiguration?=?GlobalConfiguration.loadConfiguration(
    ????configurationDirectory);
    ????
    ?????//设置为application模式
    ??flinkConfiguration.set(
    ????DeploymentOptions.TARGET,
    ????YarnDeploymentTarget.APPLICATION.getName());
    ??//yarn?application?name
    ??flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME,?"jobName");
    ??
    ??.........
    • 设置用户jar的参数和主类
    //??设置用户jar的参数和主类
    ??ApplicationConfiguration?appConfig?=?new?ApplicationConfiguration(args,?null);
    • 提交任务到集群
    ??YarnClusterDescriptor?yarnClusterDescriptor?=?new?YarnClusterDescriptor(
    ????flinkConfiguration,
    ????yarnConfiguration,
    ????yarnClient,
    ????clusterInformationRetriever,
    ????true);
    ??ClusterClientProvider?clusterClientProvider?=?null;
    ??try?{
    ???clusterClientProvider?=?yarnClusterDescriptor.deployApplicationCluster(
    ?????clusterSpecification,
    ?????appConfig);
    ??}?catch?(ClusterDeploymentException?e){
    ???e.printStackTrace();
    ??}

    完整代码请参考:
    https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/cluster/SubmitJobApplicationMode.java

    Application模式源码解析

    通过上面提交的脚本我们看到入口是从flink bin目录下flink命令开始的,我们看下这个文件的最后一行代码,也就是提交任务的入口类:
    org.apache.flink.client.cli.CliFrontend,接下来我们基于flink 1.11的源码简单梳理一下flink是如何把一个任务提交到yarn集群的。

    exec?$JAVA_RUN?$JVM_ARGS?$FLINK_ENV_JAVA_OPTS?"${log_setting[@]}"?-classpath?"`manglePathList?"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"?org.apache.flink.client.cli.CliFrontend?"$@"

    入口

    在CliFrontend的main方法里,我们看到做了这么几件事。

    1. 获取flink的配置目录
    2. 加载flink的配置
    3. 加载并解析命令行参数
    4. 通过CliFrontend.parseParameters方法来执行具体的操作
    ??//?1.?find?the?configuration?directory
    ??final?String?configurationDirectory?=?getConfigurationDirectoryFromEnv();
    
    ??//?2.?load?the?global?configuration
    ??final?Configuration?configuration?=?GlobalConfiguration.loadConfiguration(configurationDirectory);
    
    ??//?3.?load?the?custom?command?lines
    ??final?List?customCommandLines?=?loadCustomCommandLines(
    ???configuration,
    ???configurationDirectory);
    
    ??try?{
    ???final?CliFrontend?cli?=?new?CliFrontend(
    ????configuration,
    ????customCommandLines);
    
    ???SecurityUtils.install(new?SecurityConfiguration(cli.configuration));
    ???int?retCode?=?SecurityUtils.getInstalledContext()
    ?????.runSecured(()?->?cli.parseParameters(args));
    ???System.exit(retCode);
    ??}

    执行具体的操作

    在parseParameters方法里,解析出来要执行的操作,然后通过一个switch来进入要执行的方法,我们这里是进入runApplication方法。

    ???switch?(action)?{
    ????case?ACTION_RUN:
    ?????run(params);
    ?????return?0;
    ????case?ACTION_RUN_APPLICATION:
    ?????runApplication(params);
    ?????return?0;
    ????case?ACTION_LIST:
    ?????list(params);
    ?????return?0;
    ?????..........
    ??????}???

    runApplication方法

    在这个方法里,主要是用传进来的命令行参数构造出来flink的配置对象Configuration,以及application模式所需的配置ApplicationConfiguration,包括入口类,jar包参数,最后

    ???//?用传进来的命令行参数构造出来flink的配置对象Configuration
    ?final?Configuration?effectiveConfiguration?=?getEffectiveConfiguration(
    ????activeCommandLine,?commandLine,?programOptions,?Collections.singletonList(uri.toString()));
    ????
    ??//构造包含入口类和jar包参数的配置ApplicationConfiguration
    ??final?ApplicationConfiguration?applicationConfiguration?=
    ????new?ApplicationConfiguration(programOptions.getProgramArgs(),?programOptions.getEntryPointClassName());
    ????
    ??deployer.run(effectiveConfiguration,?applicationConfiguration);

    构造ClusterDescriptor

    上面的方法会进入
    ApplicationClusterDeployer的run方法,在这里会根据配置使用工厂类构造不同的ClusterDescriptor,比如是k8s的话会构造
    KubernetesClusterDescriptor,部署在yarn的话会构造YarnClusterDescriptor。之后会通过deployApplicationCluster来部署application模式的flink程序。

    Deploy Application Cluster

    我们这里以yarn集群为例,进入YarnClusterDescriptor#deployApplicationCluster方法,在这个方法里,我们看到经过一些简单的检查之后,调用了private方法YarnClusterDescriptor#deployInternal,这个deployInternal是一个提供公共功能的方法,可以看下其他的部署模式,yarn session模式,per job模式,都是调用的这个方法,只是参数不同而已。

    我们简单看下这个方法:

    ?/**
    ??*?This?method?will?block?until?the?ApplicationMaster/JobManager?have?been?deployed?on?YARN.
    ??*
    ??*?@param?clusterSpecification?一些配置参数
    ??*?@param?applicationName?yarn?job的名字
    ??*?@param?yarnClusterEntrypoint?入口类
    ??*?@param?jobGraph?程序的jobGraph,可为空
    ??*?@param?detached?是否是隔离模式
    ??*/
    ?private?ClusterClientProvider?deployInternal(
    ???ClusterSpecification?clusterSpecification,
    ???String?applicationName,
    ???String?yarnClusterEntrypoint,
    ???@Nullable?JobGraph?jobGraph,
    ???boolean?detached)?throws?Exception?{

    在这个方法里,将会根据不同的部署模式做一些必要的检查,然后启动yarn容器的操作。比如per job模式,上传flink jar包等等,都是在这个方法完成的。此外,该方法会一直阻塞到
    ApplicationMaster/JobManager部署成功,之后会进入用户程序的入口类
    ApplicationClusterEntryPoint来执行用户程序。

    ApplicationClusterEntryPoint

    yarn组件启动完成之后,开始执行用户的程序,在这个类里,会做以下的一些工作:

    • 下载必要的jar或者resources
    • 进行leader选举,决定谁执 main 方法
    • 用户程序退出时终止集群
    • 保证HA和容错

    application模式提交任务到yarn集群,大概的流程就先讲到这里,flink任务执行的流程,后续再写篇文章专门介绍。

    更多精彩信息,欢迎关注我的公众号【大数据技术与应用实战】

    控制面板
    您好,欢迎到访网站!
      查看权限
    网站分类
    最新留言