1. 首页
  2. > 税务筹划 >

向集群提交作业的命令是什么意思(redis集群不能使用的命令)

这篇文章主要介绍Flink集群的多种部署方式以及如何对它进行高可用提交配置,并讨论Flink主进程及工作进程等重要配置参数。这篇文章不仅仅介绍Flink集群的搭建,还会介绍如何配置Flink集群。


二 独立集群部署

一个 Flink 独立集群至少包含一个主进程和一个 TaskManager 进程,可以运行在一台或多台机器上。所有进程都是作为普通的不能 JVM 进程来运行。图1-1 展示了一个Flink 独立集群的工作模式。


图1-1 Flink的独立集群模式


主进程会启动单独的线程来运行 Dispatcher 和 ResourceManager。一旦它们运行起来, TaskManager 就会将自己注册到 Reso urceManager 中。图1-2 展示了向独立集群提交作业的过程。


图1-2 向Flink独立集群提交应用


客户端将作业提交到 Dispatcher ,后者会在内部启动一个 JobManager 线程并提供用于执行的 JobGraph 。 JobManager 向 ResourceManager 申请必要数量的处理槽,并在处理槽准备完毕后将作业部署执行。


在独立集群部署模式下,主进程和工作进程不会因为故障而重启。只要有足够多的剩余处理槽,作业就可以从工作进程故障中恢复。我们可以通过运行一个或多个后备工作进程来对此进行保障。从主进程故障中恢复作业需要进行高可用设置,我们会在后面讨论它。


2.1 集群规划


2.2 集群部署

为了搭建一个 Flink 独立集群,需要从 Apache Flink 的官网下载它的二进制发行版,然后利用以下命令对 tar 文件进行解压。


2.2.1 解压安装包

[bigdata@bigdata12 software]$ tar -zxvf flink-1.10.3-bin-scala_2.12.tgz -C /opt/module/

解压后的目录有一个 ./bin 的文件夹,里面包含了用于启动和停止 Flink 进程的 bash 脚本。 .用的/bin/start-cluster.sh 脚本会在本地机器上启动一个主进程, 在本地或远程机器上启动一个或多个 TaskManager。


2.2.2 修改flink-conf.yam向l文件

F集群link 中预定义的本地设置模式会在本地机器分别启动一个主进程和一个TaskManager。为了在多台机器上搭建 Flink 集群,需要调整一下flink-conf.yaml文件的默认配置。 在./conf/flink-conf.yaml文件中将 jobmanager.rpc.address 一项配置为主进程所在机器的主机名(或 IP 地址) 。


[big的data@bigdata12 conf]$ pwd /opt/module/flink-1.10.3/conf [bigdata@bigdata12 conf]$ vi flink-conf.yaml jobmanager.rpc.address: bigdata12

2.2.3 修改masters文件

[bigdata@bigdata12 conf]$ vi masters bigdata12:8081

2.2.4 修改slaves文件

将所有需要运行 TaskManager 的主机名(或 IP 地址)列在./conf/slaves 文件中。


bigdata13 bigdata14

2.2.5 分发 flink-1.10.3 文件夹

向集群的所有节点分发配置好的Flink安装包,可以用rsync或者scp方式。我根据rsync原理封装了一个自动同步脚本。可以把安装包自动分发给bigdata13和bigdata14


[bigdata@bigdata12 module]$ xsync flink-1.10.3

2.2.6 启动集群

一切准备就绪后,你就可以利用. /bin/start-cluster. sh脚本来启动 Flink 集群。该脚本会在本地启用一个 JobManager,并针对 slaves 文件中的每一个条目启动一个 TaskManager。


[bigdata@bigdata12 flink-1.10.3]$ bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host bigdata12. Starting taskexecutor daemon on host bigdata13. Starting taskexecutor daemon on host 用的bigdata14.

2.2.7 查看集群状态

可以通过封装命令的方式查看每个节点的状态(我自行编写了一个查看后台进程状态的脚本),或者通过访问主进程所在机器的 Web UI 检查主进程的启动以及 TaskManager 的注册情况。


① 通过命令查看后台进程


[bigdata@bigdata12 flink-1.10.3]$ jpsall =============== bigdata12 =============== 2272 StandaloneSessionClusterEntrypoint 2355 Jps =============== bigdata13 =============== 2289 Jps 2216 TaskManagerRunner =============== bigdata14 =============== 2277 Jps 2204 TaskManagerRunner [bigdata@bigdata12 flink-1.10.3]$

② 通过web界面查看flink集群



2.2.8 关闭集群

可以利用./bin/stop-cluster.sh脚本停止分布式独立集群。


[bigdata@bigdata12 flink-1.10.3]$ bin/stop-cluster.sh

三 独立集群的高可用配置

集群的高可用是保证流式应用程序持续运行的基础。同时,需要保证应用程序在遇到故障时能够自动恢复。虽然工作进程的故障
可以由 ResourceManager 来解决,但 JobManager 组件的故障恢复就需要额外的高可用( HA )配置。


Flink的JobManager中存放了应用以及相关元数据,例如应用的JAR 文件、 JobGraph 以及已完成检查点的路径信息。这些信息都需要在主进程发生故障时进行恢复。 Flink 的 HA 模式需要依赖Apache ZooKeeper以及某种持久化远程存储(例如HDFS、NFS或者S3)。JobManager 会将所有相关数据保存到持久化存储,并把存储路径写入 ZooKeeper 。 一旦发生故障,新的 JobManager 就可以从ZooKeeper中查找相关路径并根据它从持久化存储中加载元数据。闲话少许,我们继续配置独立集群的高可用。


不建议使用Flink安装包自带的ZooKeeper。


3.1 集群规划


3.2 集群部署

3.2.1 解压安装包

[bigdata@bigdata12 software]$ tar -zxvf flink-1.10.3-bin-scala_2.12.tgz -C /opt/module/

3.2.2 修改flink-conf.yaml文件

[bigdata@bigdata12 conf]$ vi flink-conf.yaml 意思# 指定高可用模式(必须) high-availability: zookeeper # Zookeeper提供分布式协调服务(必须) high-availability.zookeeper.quorum: bigdata12:2181,bigdata13:2181,bigdata14:2181 # 根ZooKeeper节点,在该节点下放置所有集群节点(推荐) high-availability.zookeeper.path.root: /flink # JobManager的元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须) high-availability.storageDir: hdfs:///flink/ha # 自定义集群(推荐) high-availability.cluster-id: /flinkcluster # state checkpoint 模式 state.backend: filesystem # 检查点(checkpoint)生成的分布式快照的保存位置,默认是jobmanager的memory,但是HA模式必须配置在hdfs上 state.checkpoints.dir: hdfs:///flink/checkpoint

3.2.3 修改masters文件

bigdata12:8081 bigdata13:8081

3.2.4 修改slaves文件

bigdata12 bigdata13 bigdata14

3.2.5 拷贝hadoop支持的jar包

[bigdata@bigdata12 software]$ cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /opt/能使module/flink-1.10.3/lib/

3.2.6 分发 flink-1.10.3 文件夹

[bigdata@bigdata12 module]$ xsyn作业c flink-1.10.3

3.2.7 启动集群

# 1 启动ZooKeeper集群 [bigdata@bigdata12 module]$ zkcluster.sh start # 2 启动HDFS [bigdata@bigdata12 hadoop-3.1.4]$ sbin/start-dfs.sh # 3 启动flink集群 [bigdata@bigdata12 flink-1.10.3]$ bin/start-cluster.sh Starting HA cluster with 2 masters. Starting standalonesession daemon on host bigdata12. Starting standalonesession daemon on host bigdata13. 提交Starting taskexecutor daemon on host bigdata12. Starting taskexecutor daemon on host bigdata13. Starting taskexecutor daemon on host bigdata14.

3.2.8 访问集群

可以通过下面两个地址访问flink集群


  • http://bigdata12:8081/#/job-manager/logs
  • http://bigdata13:8081/#/job-manager/logs

3.2.9 测试HA切换

没有发现Flink集群能在哪直接查询Leader节点,只能通过log查看leader还是standby状态:


3.2.9.1 查看bigdata12的日志

可以看出bigdata12是leader。也就是active状态。



3.2.9.2 查看bigdata13的日志

hadoop13的日志没有leadership标识,也就是为standby状态。



手动杀死bigdata12激活状态的jobmanager:


[bigdata@bigdata12 software]$ jps 7072 DataNode 8753 StandaloneSessionClusterEntrypoint 9956 Jps 9101 TaskManagerRunner 6910 NameNode 5774 QuorumPeerMain [bigdata@bigdata12 software]$ kill -9 8753

3.2.9.3 再次查看bigdata13的日志

显示bigdata13为leader状态。



3.2.9.4 重启bigdata12的jobmanager

[bigdata@bigdata12 flink-1.10.3]$ bin/jobmanager.sh start [bigdata@bigdata12 flink-1.10.3]$ jps 7072 DataNode 10245 StandaloneSessionClusterEntrypoint 10295 Jps 9101 TaskManagerRunner 6910 NameNode 5774 QuorumPeerMain

3.2.9.5 查看bigdata12的日志

没有那个授权leader信息,代表bigdata12现在就是一个standby状态。



三YARN 上的 HA 安装配置

YARN 是一个集群资源和容器的管理器。默认情况下它会自动重启发生故障的主进程容器和 TaskManager 容器。因此你无须在 YARN 中设置后备进程就能实现HA 。Flink 的主进程是作为 YARN ApplicationMaster 启动的。YARN会自动重启故障的ApplicationMaster,但会跟踪并限制重启次数以防出现无限的恢复循环。


3.1 集群规划


3.2 集群部署

3.2.1 解压安装包

[bigdata@bigdata12 software]$ tar -zxvf flink-1.10.3-bin-scala_2.12.tgz -C /opt/module/

3.2.2 修改flink-conf.yaml文件

[bigdata@bigdata12 conf]$ vi flink-conf.yaml # 指定高可用模式(必须) high-availability: zookeeper # Zookeeper提供分布式协调服务(必须) 作业high-availability.zookeeper.quorum: bigdata12:2181,bigdata13:2181,bigdata14:2181 # 根ZooKeeper节点,在该节点下放置所有集群节点(推荐) high-availability.zookeeper.path.root: /flink # JobManager的元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须) high-availability.storageDir: hdfs:///flink/ha # 自定义集群(推荐) high-availability.cluster-id: /flinkcluster # state checkpoint 模式 state.backend: filesystem # 检查点(checkpoint)生成的分布式快照的保存位置,默认是jobmanager的memory,但是HA模式必须配置在hdfs上 state.checkpoints.dir: hdfs:///flink/checkpointredis # 配置应用尝试重启的最大次数 yarn.application-attempts: 4

3.2.3 修改masters文件

bigdata12:8081 bigdata13:8081

3.2.4 修改slaves文件

bigdata12 bigdata13 bigdata14

3.2.5 拷贝hadoop支持的jar包

[bigdata@bigdata12 software]$ cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /opt/module/flink-1.10.3/lib/

3.2.6 修改yarn-site.xml文件

需要像下面这样在 YARN 配置文件 yarn-site.xml 中配置ApplicationMaster 的最大重启次数:


<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description> The maximum number of application master execution attempts. Default value is 2, i.e., an application is restarted at most once. </description> </property>

3.2.7 分发 flink-1.10.3 文件夹和yarn-site.xml

[bigdata@bigdata12 module]$ xsync flink-1.10.3 [bigdata@bigdata12 modredisule]$ xsync /opt/modu向le/hadoop-2.7.5/etc/hadoop/yarn-site.xml

3.2.8 删除ZooKeeper上的元数据节点

[zk: localhost:2181(CONNECTED) 0] ls / [admin, brokers, cluster, config, consumers, controller_epoch, flink, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] 不能[zk: localhost:2181(CONNECTED) 1] deleteall /flink [zk: localhost:2181(CONNECTED) 2] ls / [admin, brokers, cluster, config, consumers, controller_epoch, isr_change_notification, latest_produ是什么cer_id_block, log_dir_event_notification, zookeeper]

3.2.9 删除HDFS上的存储目录

[bigdata@bigdata12 conf]$ hadoop fs -rm -r /flink/*

Flink 提供了两种在 yarn 上运行的模式,分别为 Session-Cluster 和 Per-Job-Cluster模式。


3.3 Session-cluster模式


Session-Cluster 模式需要先启动集群,然命令后再提交作业,接着会向 yarn 申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn 中的其中一个作业执行完成后,释放了资源,下个作业才会正常提集群交。所有作业共享 Dispatcher 和 ResourceManager;共享资是什么源;适合规模小执行时间短的作业。


在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提交。这个Flink 集群会常驻在 yarn 集群中,除非手工停止。


3.3.1 Session Cluster

3.3.1.1 启动 hadoop 集群

# 自己封装的启动脚本 [bigdata@bigdata12 hadoop-3.1.4]$ hadoopcluster.sh start

3.3.1.2 启动 yarn-session

[bigdata@bigdata12 flink-1.10.3]$ bin/yarn-session.sh -n 2 -s 2 -jm 512 -tm 512 -nm flink.node -d

  • 其中:
    -n(--container): TaskManager 的数量。
    -s(--slots): 每个 TaskManager 的 slot 数量,默认一个 slot 一个 core,默认每个
    taskmanager 的的 slot 的个数为 1,有时可以多一些 taskmanager,做冗余。
    -jm: JobManager 的内存(单位 MB)。
    -tm:每个 taskmanager 的内存(单位 MB)。
    -nm: yarn 的 appName(现在 yarn 的 ui 上的名字)。
    -d:后台执行。

3.3.1.3 去 yarn 能使控制台查看任务状态

http://bigdata12:8088/cluster/apps/FINISHED


3.3.1.4 取消 yarn-session

yarn application --kill application_id

3.3.2 Per Job Cluster

  • 启动 hadoop 集群(略)
  • 不启动 yarn-session,直接执行 job

四系统配置

Apache Flink 提供了很多用来配置其行为以及调节性能的参数。所有这些参数都可以在.conf/flink-conf.yaml文件中定义。该文件是一个存储键值的扁平化 YAML 文件,会供不同组件(如意思启动脚本、主进程和工作进程以及 CLI 客户端)读取使用。启动脚本(如./bin/start-client.sh)会解析该配置文件获得命令 JVM 参数以及堆大小设置, CLI 客户端 (./bin/flink ) 会从中获得连接信息以访问主进程。所有对于民配置文件的修改都需要重启 Flink 才能生效。


4.1 Java的类加载

Flink默认会使用 PATH 环境变量中的 Java 执行文件来启动 JVM 进程 。 如果PATH中没有配置 Java 或者需要使用其他Java版本,你可以利用配置文件中的JAVA_HOME 环境变量来指定 Java 安装目录。


Flink的JVM进程支持在启动时自定义Java选项。例如,你可以使用 env.java.opts、 evn.java.opts.jobmanager 及 env.java.opts.taskmanager 来对垃圾收集器进行微调或开启远程调试。


4.2 CPU

Flink本身不会主动限制消耗的 CPU 资源量 ,但会采用处理槽来控制可以分配给工作进程(TaskManager)的任务数。每个TaskManager都能提供一定数量的处理槽,这些处理槽由ResourceManager 统一注册和管理。 JobManager 需要请求一个或多个处理槽来执行应用。每个处理槽可以处理应用的一个“切片”,即应用程序每个算子的一个并行任务。因此, JobManager 至少需要获取和应用算子最大并行度等量的处理槽。任务会在工作进程( TaskManager )中以线程方式执行,它们可以按需获取足够的 CPU 资源。


TaskManager 所提供的处理槽数量是由配置文件中的 taskmanager.numberOfTaskSlots 项来指定的。它的默认值是每个 TaskManager 一个处理槽。通常你只需要在独立集群模式下设置处理槽的数量,因为基于资源管理框架(YARN、Kubernetes、Mesos)运行 Flink 可以轻松在每个计算节点上启动多个(单处理槽的) TaskManager 。


4.3内存和网络缓冲

Flink 的主进程和工作进程对内存有着不同需求。主进程主要管理计算资源(ResourceManager)和协调应用(JobManager)执行;而工作进程要负责繁重的工作以及处理潜在规模庞大的数据。


通常,主进程对于内存的要求并不苛刻,它默认的 JVM 堆内存数量只有lGB,但如果主进程需要管理多个应用或某个应用具有很多算子,你可能需要利用 jobmanager.heap.size 配置项来增加 JVM 堆的容量。配置工作进程的内存要复杂一些,因为会有多个组件分别占用不同类型的内存。


五 总结

我们在这篇文章中讨论了如何针对不同环境搭建 Flink 集群以及如何进行 HA 设置。我们讨论了几个重要的配置选项。


如有描述不当之处,还请在评论区指正。


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至123456@qq.com 举报,一经查实,本站将立刻删除。

联系我们

工作日:9:30-18:30,节假日休息