首页
Search
1
Linux免密登陆-ubuntu
976 阅读
2
Redis集群部署方案
953 阅读
3
数据同步工具DataX、Sqoop和Canal
917 阅读
4
Hadoop各版本汇总
906 阅读
5
Spark学习笔记
884 阅读
大数据
Flink
后端
Java
笔记
运维
游客
Search
标签搜索
大数据
Flink
离线
实时
Redis
OpenJDK
Java
笔记
JVM
Elasticsearch
GC
Hadoop
Hudi
Flink CDC
K8S
数据湖
WD1016
累计撰写
56
篇文章
累计阅读
12.4万
次
首页
栏目
大数据
Flink
后端
Java
笔记
运维
页面
搜索到
7
篇与
Flink
的结果
返回首页
2025-11-28
Hudi 源码贡献:引入Flink聚类策略,降低资源消耗,提升稳定性
在 Hudi 的设计中,为了保持数据文件大小合理、提升查询性能,引入了 clustering 后台表服务。Clustering 的核心目的,是将多个小文件(small files)合并或重写为更大、更均衡的文件,从而减少读取和查询过程中因小文件过多带来的性能开销。然而,在实际生产环境中,会出现一个较为微妙但对稳定性和资源利用影响显著的问题 —— “尾部小文件(tail small file)”。具体来说,一次 clustering 执行后可能会残留一个体积非常小的文件,而在下一轮 clustering 周期中,这个小文件会再次被选中重写。如此反复,不仅浪费计算资源,还会导致 commit 和 metadata 的额外开销,甚至可能因为文件频繁更替,造成下游查询表现不稳定。准备一个分区,其数据量大约 720 MB,并配置如下参数:'clustering.plan.strategy.target.file.max.bytes' = '650000000'在执行聚簇之前,该分区(例如 p_date=20251029)大约包含 10 个 Parquet 文件,每个文件大小从 8 MB 到 120 MB 不等。第一次运行 clustering 后,会生成一个大文件(大约 680 MB)和一个小文件(大约 50 MB)。这符合设定的目标文件大小,按理说不应该再有额外合并。然而,当 clustering 周期性地持续运行时,这个 50 MB 的小文件却持续被反复重写 —— 每次 clustering 执行都会生成这个小文件的新版本。由于启用了 “clean-retain-commits” 配置,多个该小文件的历史版本被保留下来,导致不必要的文件膨胀和冗余提交。在生产环境中,各分区的数据量往往差异较大。即使对文件的最小/最大大小做了精细调优,也难以保证所有分区都能均匀地合并成大文件,因此产生“小尾巴文件”几乎是不可避免的。为此,我们引入了一种全新的聚类策略。该策略会综合考虑是否需要在聚类过程中进行排序等因素,并仅在 clustering 的 plan 构建阶段 添加 early-exit(提前退出) 条件,而不会改变 commit 或 execution 的整体流程。用户可通过参数 clustering.plan.strategy.class 启用该策略,完全兼容旧逻辑。该策略已测试并在生产环境稳定运行,可靠性经过验证。目前,这一策略已被社区正式采纳合并入主分支。对于 Hudi + Flink 场景下启用了 clustering 的用户——尤其是分区数据分布不均 的场景,该策略能够显著降低不必要的 clustering 开销,从而提升作业的稳定性与整体效率。如果你在生产环境中使用 Hudi + Flink 进行 clustering,强烈建议开启此新策略。对于绝大多数场景来说,这是一个几乎“零成本”、但收益十分明显的优化方案。
2025年11月28日
3,614 阅读
107 点赞
2025-08-02
湖仓一体流批协同实践:从Spark批量加速到Flink实时更新
在湖仓一体架构下,无论是业务库数据同步还是宽表构建,Flink虽然支持"先全量,再增量"的处理模式,但当历史数据规模达到亿级时,全量数据导入阶段存在瓶颈,如果调高并发会抢占实时集群资源,低并发会导致处理时间过长。因此,我们采用Spark+Flink的协同计算架构,首先基于Spark的离线计算优势,在离线集群完成大规模历史数据的批量导入,再启动Flink任务进行增量更新。本文将分享该方案落地过程中遇到的问题和解决方案。Spark加速全量数据入湖 配置写入方式为bulk insert,减少数据序列化以及合并操作,该数据写入方式会跳过数据去重,所以可以在hive中通过SparkSQL预处理历史数据。set hoodie.spark.sql.insert.into.operation=bulk_insert;如果历史数据在Hive中,表格式尽可能是orc或parquet格式,否则处理效率会显著下降,SparkSQL任务配置如下:executor-memory 16g num-executors 150 executor-cores 4 spark.default.parallelism 600上述配置在数据量较大的情况下容易OOM,如果存量数据过多,需要分多个批次入湖,这样耗时更长,作业更不稳定。因为数据入湖时数据分桶、写入缓冲都属于内存密集型操作,所以适当调大spark.memory.fraction,调小spark.memory.storageFraction(减小存储内存,增加执行内存),有利于加速入湖。spark.memory.fraction 0.8 spark.memory.storageFraction 0.3同时,在资源不变的情况下,适当增大并行度,低并行度时每个Task处理的数据量较大,排序操作的内存压力剧增,当单个Task的数据量超过可用内存时,Spark会触发磁盘溢写,并行度增大一倍后,每个Task处理的数据量减半,排序完全在内存中完成,避免Spill和GC,效率显著提升。spark.default.parallelism 1200最终配置如下,实现2分钟内15亿全量数据入湖:spark-sql --master yarn --queue ... --deploy-mode client --name ... \ --driver-memory 16g --driver-cores 8 --executor-memory 16g --num-executors 150 \ --conf spark.executor.heartbeatInterval=120000s \ --conf spark.network.timeout=130000s \ --conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.3 \ --executor-cores 4 --conf spark.default.parallelism=1200 \ --hiveconf hive.cli.print.header=true \ --jars hudi-spark3.2-bundle_2.12-0.14.1.jar \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' -S全量数据入湖后小文件合并 增大并行度会生成很多小文件,在全量数据导入完成后,Flink任务启动前,通过Spark提交Clustering任务进行一次小文件合并。spark-submit --master yarn --deploy-mode cluster --queue ... \ --name ... \ --driver-memory 16g --executor-memory 16g --num-executors 150 \ --executor-cores 8 --conf spark.default.parallelism=1200 \ --class org.apache.hudi.utilities.HoodieClusteringJob hudi-utilities-bundle_2.12-0.14.1.jar \ --mode scheduleAndExecute \ --base-path hdfs://... \ --table-name ... \ --retry-last-failed-clustering-job \ --job-max-processing-time-ms 1800000 \ --hoodie-conf hoodie.clean.async=true \ --hoodie-conf hoodie.clean.automatic=true \ --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS \ --hoodie-conf hoodie.cleaner.fileversions.retained=1 \ --hoodie-conf hoodie.cleaner.parallelism=100 \ --hoodie-conf hoodie.clustering.async.enabled=true \ --hoodie-conf hoodie.clustering.async.max.commits=1 \ --hoodie-conf hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy \ --hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=20000 \ --hoodie-conf hoodie.metadata.enable=false \ --hoodie-conf hoodie.clustering.plan.strategy.sort.columns=...使用KEEP_LATEST_FILE_VERSIONS清理策略,保留最后1个文件版本,避免使用成KEEP_LATEST_COMMITS,因为压缩前的文件仍关联原始提交时间,会导致合并前的小文件和合并后的大文件共存的问题。hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS hoodie.cleaner.fileversions.retained=1清理策略原理可参考:https://developer.aliyun.com/article/1457637 Flink index bootstrap加速 第一次启动实时任务需要打开索引引导函数,同时把开始消费时间回调到离线数据的截止时间,配置如下:'index.bootstrap.enabled'='true' 'scan.startup.mode'='timestamp' 'scan.startup.timestamp-millis' = ''数据量较大时,可以提高并发,同时调整下面三个配置,以确保上下游算子的并行度保持一致。否则,会导致大量不必要的网络IO,进而引发 checkpoint 长时间阻塞,最终可能导致任务启动失败。'write.tasks' = '100', 'write.bucket_assign.tasks' = '100', 'write.index_bootstrap.tasks'='100'初始化索引完成后,执行savepoint,减小任务的并行度,关闭index bootstrap,从savepoint重新启动作业。'write.tasks' = '12' 'write.bucket_assign.tasks' = '20' 'index.bootstrap.enabled'='false'由于savepoint保存了index bootstrap算子信息,关闭index bootstrap后,会导致作业无法恢复,需要通过参数配置,允许跳过无法还原的保存点状态。作业为Per-Job模式时,启动任务配置--allowNonRestoredState参数即可。flink run -d -t yarn-per-job -Dyarn.application.queue=... -Dparallelism.default=6 \ -Dtaskmanager.numberOfTaskSlots=3 -Djobmanager.memory.process.size=2048m \ -Dexecution.checkpointing.snapshot-compression=true \ -Dexecution.checkpointing.local-backup.enabled=true \ -Dexecution.state-recovery.from-local=true \ -s hdfs://.../savepoint/savepoint-f36c2e-ada1c3edc7f2 --allowNonRestoredState \ -Dtaskmanager.memory.process.size=2048m -Dtaskmanager.memory.managed.fraction=0.3 \ -Dstate.backend.rocksdb.log.size=100m -Dyarn.application.name=... -c ... ....jar \ ... -stateBackendType 2 -externalizedCheckpointCleanup RETAIN_ON_CANCELLATION -enableIncremental true作业是Flink SQL或者Application模式时,并不支持在任务启动命令上配置--allowNonRestoredState参数,设置下面参数后并不生效,作业仍然会启动失败。execution.savepoint.ignore-unclaimed-state=true如果作业需要以Application模式运行,目前最佳的方式是先通过Per-Job模式恢复作业,再次执行savepoint,把作业从该savepoint以Application模式的方式拉起即可。
2025年08月02日
678 阅读
35 点赞
2023-12-30
Flink + Hudi 流批一体作业稳定性优化
目前,任务分为两种类型:当业务逻辑较为简单时,使用 Flink SQL 进行处理,例如将原始日志或业务库同步至 Hudi 的 ODS 层、进行多表关联和聚合等操作;当业务逻辑比较复杂或需要特殊处理时,例如部分数据需要通过 API 获取,则使用 Flink DataStream API 消费 Hudi 数据,并经过一系列处理后,写入 Hudi 的 DWD、DWS 或 ADS 层。针对内存方面的问题,对Flink写入Hudi任务多个Task的Heap使用率进行了监控。注意到使用率存在较大的波动,有时甚至可能超出分配的阈值,这种情况可能导致任务被强制终止。Yarn Application报错信息: org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. Association with remote system [akka.tcp://flink] has failed, address is now Association with remote system [akka.tcp://flink] has failed,address is now gated for [50] ms NodeManager Local logs报错信息: org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=13427,containerID=container_e632_1670877056012_20807318_01_000001] is running beyond physical memory limits. Current usage: 2.0 GB of 2 GB physical memory used; 4.5 GB of 4.2 GB virtual memory used. Killing container.可以通过在yarn-site.xml中将yarn.nodemanager.vmem-check-enabled设为false,或者为jobmanager和taskmanager分配足够的内存来提高任务的稳定性。<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> 注:是否启用一个线程检查每个任务正在使用的虚拟内存量,如果任务超出了分配值,则直接将其kill,默认是true另一个问题是,Managed Memory的使用率较低,而Task Heap的使用率却相当高。在增加TaskManager的内存分配(通过-ytm参数)时,会根据taskmanager.memory.managed.fraction的设置将新增的内存分配给Managed Memory。这可能会造成资源浪费。适当减小该配置的值有助于提高整体内存利用率。taskmanager.memory.managed.fraction: 0.2使用Hudi写入可能导致节点过载,从而导致上游算子背压。在执行SQL时,可以考虑提高写入并行度。如果是DateStream API Hudi到Hudi的数据传输,由于数据分布可能不均,建议在读取后直接使用rebalance()算子进行数据均衡处理。对于当前分区文件数量问题,写入Hudi任务的并行度会直接影响文件数量,随着并行度的增加,文件数量也会相应增加。此外,对于COPY_ON_WRITE模式,以下三个参数会直接影响文件数量。任务在运行一段时间后,当合并操作和触发操作达到平衡时,文件数量会在很小范围内波动。此外,可以通过减小clustering.delta_commits来减少当前分区文件数量。clustering.delta_commits 触发合并文件所需的提交次数 clean.retain_commits 保留的提交数,不进行清理 cleaner.policy(KEEP_LATEST_COMMITS)相对于在线clustering而言,离线clustering更加稳定并且可以实现资源隔离。经过大量任务长时间运行后来看,在使用离线clustering时需要注意以下问题。常用离线clustering命令如下:nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/${bundle_jar_name}.jar \ --service \ --path ${table_path} > clustering.log & \以上离线clustering运行时,--plan-partition-filter-mode默认为NONE,所有符合条件的分区都会加入到任务当中,根据使用情况来看,随着历史分区的增加,资源使用逐渐处于较高的水平,该模式比较适用于临时处理大量滞留文件。同时,当数据量较大时还存在一个问题,当Clustering异常时,重启任务,偶尔会报错提示历史分区某parquet不存在(社区群有同学反馈过该问题)。通过配置参数--plan-partition-filter-mode RECENT_DAYS --target-partitions 2可以解决该问题。还可以使用--plan-partition-filter-mode的SELECTED_PARTITIONS模式,划定历史分区的范围,处理积压问题。Flink离线Clustering使用建议,分区过滤模式RECENT_DAYS和NONE的组合使用相对较为稳定,适用于线上环境。当历史分区中存在大量待Clustering的文件时,可以临时使用NONE模式压缩几次。等到压缩速度跟得上时,再切换回RECENT_DAYS模式进行进一步的处理,最终的离线Clustering命令:常规: nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --clustering-delta-commits 8 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --plan-partition-filter-mode RECENT_DAYS \ --target-partitions 2 \ --path hdfs://...... > clustering.log & 处理积压: flink run -c \ org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --min-clustering-interval-seconds 3 \ --max-num-groups 1000 \ --clustering-delta-commits 1 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --path ${target_path}在MERGE_ON_READ模式下,当前分区的文件数量与compaction.delta_commits、clean.retain_commits有关。但是当clean.retain_commits大于10时,可能会出现parquet和log文件无法被清理干净的情况,导致部分小文件滞留在历史分区中。HoodieFlinkClusteringJob类可以接受多个参数,可以在源码org.apache.hudi.sink.clustering.FlinkClusteringConfig中或使用如下命令查看:flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ ../flink-1.14.2/lib/hudi-flink1.14-bundle-0.12.3.jar \ -h需要注意的是Boolean类型的参数,默认都为false,如果要开启,直接使用--param_name即可,而不是--param_name true,Hudi中解析参数使用的JCommander,如下图所示,如果Boolean参数后面跟true、false、0、1,都会在遍历参数时错位,导致任务提交失败,报错提示和这种配置方式并不是很友好,很容易让使用者误认为该参数是不可配置的!通过适当增加以下两个超时参数,可以有效避免因网络波动等原因导致的超时问题,从而降低任务失败的概率。set execution.checkpointing.timeout=600000 properties.request.timeout.ms=120000(connector为kafka时)以下参数设置为true时,如果TaskManager发生了Akka错误,例如无法与JobManager通信或其他网络问题,TaskManager将会立即退出;设置为false时,TaskManager将不会主动退出,而是继续运行,这可能会导致系统处于不稳定状态,并且需要人工干预来解决问题。taskmanager.exit-on-fatal-akka-error: true改动Hudi源码或者适配低版本大数据组件,参考了如下文章,可顺利完成编译打包:https://blog.csdn.net/weixin_45417821/article/details/127407461mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 -Dflink1.14 -Dscala-2.12 mvn clean install -DskipTests -Dscala-2.12 -Pflink-bundle-shade-hive3
2023年12月30日
2,129 阅读
65 点赞
2023-07-01
Flink流批一体作业管理平台
前言随着大数据处理需求的不断增长,流批一体作业管理平台的重要性愈发凸显。本文将介绍如何针对 Flink + Hudi 流批一体任务进行管理,特别针对 Hudi 任务的需求,支持一键启动离线 Compaction 和 Clustering 任务,保证数据湖的稳定运行。背景在大数据处理领域,简化作业的提交、监控和管理是一项重要的任务。调研过程中发现,现有的开源系统如 Dinky 和 StreamPark 在任务生成和监控方面存在一定局限性。它们使用了 Flink 或 YARN 源码中的 API,如 org.apache.hadoop.yarn.client.api.YarnClient 、 org.apache.hadoop.hdfs.DistributedFileSystem 等,导致系统与 Hadoop 集群和 Flink 客户端版本之间耦合度高,Hadoop集群使用权限要求较高。系统开发基于项目 flink-streaming-platform-web 进行开发,通过优化和功能扩展(如:对使用FlinkSQL操作Hudi表的支持),构建了一个无侵入性、与 Hadoop 集群和 Flink 客户端版本解耦的流批一体作业调度平台。只需在装有 Flink 客户端的机器上启动系统,即可轻松使用,所有任务均使用 Flink 自带的命令进行提交,如:# 提交 Compaction 任务 flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor ... # 提交 Clustering 任务 flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob ...核心功能流任务管理支持 FlinkSQL 和 JAR 提交任务,提供流式作业的提交、监控、告警和日志查看等功能,为用户提供全方位的作业管理服务。批任务调度支持 FlinkSQL 和 JAR 提交任务,可定时调度批处理作业,定时完成数据处理任务,提高数据处理效率。数据湖管理(Hudi)特别针对 Hudi 任务的需求,系统支持对 MOR(Merge On Read)和 COW(Copy On Write)模式下的离线 Compaction 和离线 Clustering 任务进行管理,保证数据湖的稳定运行和数据质量。架构设计该平台的架构设计是其高效运行的关键,采用了模块化设计,将流任务管理、批任务调度和数据湖管理等功能模块化,实现了高度的灵活性和可扩展性。每个模块都具有清晰的职责和接口,使得系统易于维护和扩展。流任务管理模块负责接收用户提交的流式作业,并将其转换为 Flink 任务进行执行。这个模块需要实现任务的监控和告警功能,以确保作业的稳定运行。采用了分布式监控系统来实现实时监控和告警,保障了系统的高可用性和可靠性。批任务调度模块则负责定时调度批处理作业,并在预定的时间点执行数据处理任务。这个模块需要考虑到作业的依赖关系和执行顺序,以确保数据处理任务按时完成。采用了依赖调度策略来解决这个问题,有效地提高了作业的执行效率。数据湖管理模块是该平台的重要组成部分,特别针对 Hudi 任务的需求进行了优化。实现了对 MOR 和 COW 模式下的离线 Compaction 和离线 Clustering 任务的管理,确保了数据湖的稳定运行和数据质量。该平台极大地提升了流批一体作业创建的效率和灵活性,提供了更便捷、可靠的作业管理解决方案。未来,将持续优化系统功能,以满足需求,并助力流批一体作业的稳定运行和管理。
2023年07月01日
1,473 阅读
91 点赞
2023-03-25
Flink on Kubernetes 计算和存储分离实践
云原生已成为业界的主要趋势之一。将Flink从Yarn迁移到Kubernetes平台带来了许多优势。在这种架构下,将计算和存储解耦,计算部分运行在Kubernetes上,而存储则使用HDFS等分布式存储系统。这样的架构优势在于可以根据实际情况独立调整计算和存储资源,从而提高整体的效率和弹性。本文将介绍四种Flink在Kubernetes上的部署模式。其中,两种是基于Native Kubernetes部署的,分别有Session模式和Application模式。另外两种是基于Flink Kubernetes Operator部署的,同样包括Session模式和Application模式。首先介绍基于Flink Kubernetes Operator部署的Application模式。如果要运行自己编写的jar包,需要先构建一个镜像。如果使用了HDFS、Hudi等其他组件,还需要在Dockerfile中将Hadoop客户端和配置文件复制到镜像中,并设置相应的环境变量。同时,将所有依赖的jar包复制到Flink Home的lib目录下。FROM flink:1.16.1-scala_2.12 USER root RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone ADD hadoop-3.1.1.tar.gz /opt ADD jdk1.8.0_121.tar.gz /opt RUN rm /opt/hadoop-3.1.1/share/hadoop/common/lib/commons-math3-3.1.1.jar RUN rm /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/commons-math3-3.1.1.jar COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/common/lib/ COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/ RUN chmod -R 777 /opt/hadoop-3.1.1/share/hadoop/common/lib/ RUN mkdir -p /opt/hadoop/conf/ COPY yarn-site.xml /opt/hadoop/conf/ COPY core-site.xml /opt/hadoop/conf/ COPY hdfs-site.xml /opt/hadoop/conf/ COPY flink-shaded-hadoop-3-uber-3.1.1.7.0.3.0-79-7.0.jar $FLINK_HOME/lib/ COPY commons-cli-1.5.0.jar $FLINK_HOME/lib/ RUN mkdir $FLINK_HOME/mylib COPY xxx-1.0-SNAPSHOT.jar $FLINK_HOME/mylib RUN chown -R flink:flink $FLINK_HOME/mylib RUN echo 'export JAVA_HOME=/opt/jdk1.8.0_121 \n\ export HADOOP_HOME=/opt/hadoop-3.1.1 \n\ PATH=$PATH:$JAVA_HOME/bin \n\ PATH=$PATH:$HADOOP_HOME/bin'\ >> ~/.bashrc EXPOSE 8081构建镜像,在Dockerfile所在的目录中执行以下命令,确保该目录包含用于构建镜像的文件。docker build -t flink-native/flink-on-k8s-xxxx .安装helmcurl https://baltocdn.com/helm/signing.asc | sudo apt-key add -sudo apt-get install apt-transport-https --yes echo "deb https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list sudo apt-get update sudo apt-get install helm安装cert-manager组件,由它提供证书服务。kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml安装Flink Kubernetes Operatorhelm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace flink --create-namespace执行上述命令后,将会从ghcr.io/apache/flink-kubernetes-operator:7fc23a1镜像仓库拉取镜像。由于下载速度较慢,可以尝试从apache/flink-kubernetes-operator:7fc23a1仓库拉取镜像,然后为其添加标签docker tag apache/flink-kubernetes-operator:7fc23a1 ghcr.io/apache/flink-kubernetes-operator:7fc23a1如果在重新安装时遇到使用kubectl delete无法删除的情况,可以尝试以下命令来实现删除操作:kubectl patch crd/flinksessionjobs.flink.apache.org -p '{"metadata":{"finalizers":[]}}' --type=merge通过执行上述命令,可以成功删除该资源。查看自定义资源 kubectl get customresourcedefinition构建一个YAML文件来提交任务。其中,image指定了镜像,jarURI指定了jar包在镜像中的位置,entryClass指定了要执行的类,args指定了该类所需的参数:kind: FlinkDeployment metadata: name: flink-application-xxx spec: image: flink-native/flink-on-k8s-xxxx flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "n" serviceAccount: flink jobManager: resource: memory: "nm" cpu: n taskManager: resource: memory: "nm" cpu: n job: jarURI: local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar entryClass: com.xxx.run.XXXJob parallelism: n upgradeMode: stateless args: ["hdfs://host:9000/data/input","hdfs://host:9000/data/output","n"]提交任务kubectl create -f xxx.yaml查看flinkdeployment kubectl get flinkdeployment查看日志kubectl logs -f deploy/flink-application-xxx{lamp/}Flink on K8S Session模式和Application模式需要安装Flink客户端,下载flink压缩包,解压即可设置命名空间首选项、赋权等kubectl create ns flink-native kubectl config set-context --current --namespace=flink-native kubectl create serviceaccount flink kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=cluster-admin --serviceaccount=flink-native:flink --namespace=flink-nativeSession模式,启动Flink集群bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=xxx\ -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \ -Dkubernetes.namespace=flink-native\ -Dkubernetes.service-account=flink \ -Dclassloader.check-leaked-classloader=false \ -Dkubernetes.rest-service.exposed.type=ClusterIP \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dresourcemanager.taskmanager-timeout=60000 \ -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"端口转发nohup kubectl -n flink-native port-forward --address 0.0.0.0 service/my-first-flink-cluster-rest 8081:8081 >port-forward.log &打开Flink Web UI,可以看到此时只有jobmanager向集群提交任务,运行测试任务bin/flink run -e kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=my-first-flink-cluster examples/streaming/TopSpeedWindowing.jar运行自己的jar包bin/flink run -e kubernetes-session \ -Dkubernetes.namespace=flink-native \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dkubernetes.cluster-id=xxx \ -c com.xxx.run.XXXJob \ mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2查看pod,此时可以看到生成了taskmanagerkubectl get pod -o wide -A查看日志,使用以下命令可以看到测试程序TopSpeedWindowing的输出结果kubectl logs my-first-flink-cluster-taskmanager-1-1 -n flink-native查看任务列表:bin/flink list --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxx 根据ID删除任务:bin/flink cancel --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxxxr 3ff3c5a5e3c2f47e024e2771dc108f77Application模式bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=xxx\ -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \ -Dkubernetes.namespace=flink-native\ -Dkubernetes.service-account=flink \ -Dclassloader.check-leaked-classloader=false \ -Dkubernetes.rest-service.exposed.type=ClusterIP \ -c com.sohu.longuserprofile.run.TestJob \ local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2Session模式只能提交本地(宿主机)jar包,Application模式只能使用local:///{lamp/}常用命令k8s web ui 登录token获取 kubectl -n kubernetes-dashboard describe secret $(kubectl -n kubernetes-dashboard get secret | grep dashboard-admin | awk '{print $1}') | grep token: 查看所有pod列表 kubectl get pod -o wide -A 查看pod详细信息 kubectl describe pod pod_name -n flink-native 删除deployment kubectl delete deployment/my-first-flink-cluster 进入pod kubectl exec -it -n flink-native pod_name /bin/bash 获得所有命名空间 kubectl get namespace 拷贝出来 kubectl cp -n application psqls-0:/var/lib/postgresql/data/pg_wal /home 拷贝进去 kubectl cp /home/dades/pg_wal -n application psqls-0:/var/lib/postgresql/data/pg_wal
2023年03月25日
1,115 阅读
6 点赞
1
2