首页
Search
1
Linux免密登陆-ubuntu
976 阅读
2
Redis集群部署方案
953 阅读
3
数据同步工具DataX、Sqoop和Canal
917 阅读
4
Hadoop各版本汇总
906 阅读
5
Spark学习笔记
884 阅读
大数据
Flink
后端
Java
笔记
运维
游客
Search
标签搜索
大数据
Flink
离线
实时
Redis
OpenJDK
Java
笔记
JVM
Elasticsearch
GC
Hadoop
Hudi
Flink CDC
K8S
数据湖
WD1016
累计撰写
56
篇文章
累计阅读
12.4万
次
首页
栏目
大数据
Flink
后端
Java
笔记
运维
页面
搜索到
30
篇与
WD1016
的结果
返回首页
2025-11-28
Hudi 源码贡献:引入Flink聚类策略,降低资源消耗,提升稳定性
在 Hudi 的设计中,为了保持数据文件大小合理、提升查询性能,引入了 clustering 后台表服务。Clustering 的核心目的,是将多个小文件(small files)合并或重写为更大、更均衡的文件,从而减少读取和查询过程中因小文件过多带来的性能开销。然而,在实际生产环境中,会出现一个较为微妙但对稳定性和资源利用影响显著的问题 —— “尾部小文件(tail small file)”。具体来说,一次 clustering 执行后可能会残留一个体积非常小的文件,而在下一轮 clustering 周期中,这个小文件会再次被选中重写。如此反复,不仅浪费计算资源,还会导致 commit 和 metadata 的额外开销,甚至可能因为文件频繁更替,造成下游查询表现不稳定。准备一个分区,其数据量大约 720 MB,并配置如下参数:'clustering.plan.strategy.target.file.max.bytes' = '650000000'在执行聚簇之前,该分区(例如 p_date=20251029)大约包含 10 个 Parquet 文件,每个文件大小从 8 MB 到 120 MB 不等。第一次运行 clustering 后,会生成一个大文件(大约 680 MB)和一个小文件(大约 50 MB)。这符合设定的目标文件大小,按理说不应该再有额外合并。然而,当 clustering 周期性地持续运行时,这个 50 MB 的小文件却持续被反复重写 —— 每次 clustering 执行都会生成这个小文件的新版本。由于启用了 “clean-retain-commits” 配置,多个该小文件的历史版本被保留下来,导致不必要的文件膨胀和冗余提交。在生产环境中,各分区的数据量往往差异较大。即使对文件的最小/最大大小做了精细调优,也难以保证所有分区都能均匀地合并成大文件,因此产生“小尾巴文件”几乎是不可避免的。为此,我们引入了一种全新的聚类策略。该策略会综合考虑是否需要在聚类过程中进行排序等因素,并仅在 clustering 的 plan 构建阶段 添加 early-exit(提前退出) 条件,而不会改变 commit 或 execution 的整体流程。用户可通过参数 clustering.plan.strategy.class 启用该策略,完全兼容旧逻辑。该策略已测试并在生产环境稳定运行,可靠性经过验证。目前,这一策略已被社区正式采纳合并入主分支。对于 Hudi + Flink 场景下启用了 clustering 的用户——尤其是分区数据分布不均 的场景,该策略能够显著降低不必要的 clustering 开销,从而提升作业的稳定性与整体效率。如果你在生产环境中使用 Hudi + Flink 进行 clustering,强烈建议开启此新策略。对于绝大多数场景来说,这是一个几乎“零成本”、但收益十分明显的优化方案。
2025年11月28日
3,614 阅读
107 点赞
2025-08-21
阿里云大数据计算调优:从资源管理到复杂查询加速的深度优化实践
云平台的性能与功能通常是用户最核心的关注点——性能直接影响计算任务的执行效率,而功能则决定了数据处理链路的流畅性与灵活性。近期在使用云平台大数据计算服务时,重点测试了其数据处理、SQL查询及并发执行等性能表现,同时评估了任务调度、权限管理和数据导入导出等关键功能的实用性。任务优化 MaxCompute常用参数set odps.sql.allow.fullscan=true; 不指定分局查询 set odps.sql.groupby.skewindata=true; 解决agg数据倾斜问题 set odps.service.mode=off; 如果打开,超过十分钟时,会被kill,增加执行时长 set odps.sql.hive.compatible=true; 开启Hive SQL兼容模式,降低任务迁移成本 set odps.task.wlm.quota=os_datagouptest3; 指定计算资源Quota 自动MapJoin的阈值,用于决定是否将小表数据广播set odps.optimizer.auto.mapjoin.threshold=4096000000; set odps.optimizer.enable.online.conditional.mapjoin=true; set odps.sql.split.dop={"xxx.table1":120, "xxx.table2": 10}; 读取并行度设置 set odps.optimizer.hbo.enable.new.signature=true; 历史执行信息进行查询优化的增强功能压力测试 MaxCompute并发跑任务,默认FIFO(先进先出),可配置成FAIR,资源抢占和分配情况如下FIFO 确保了公平的顺序,但可能导致头部阻塞,即早期任务(任务 1)垄断资源,延迟后续任务(任务 2 和 3),直到早期任务让步,从任务 2 和 3 的初始低值中显而易见。查询加速 MaxQA引擎,默认FAIR(公平调度),不可改:MaxQA引擎概述链接 任务并发情况如下FAIR调度支持抢占,如果一个任务超过其份额,系统可能暂停或回收其资源,重新分配给欠份额的任务,可见三个任务在各自的峰值(最高点)时期趋于均衡。DataWorks数据开发 定时任务可以是单个节点、也可以是工作流,创建时选择任务类型数据集成 官网链接Serverless资源组 官网链接Spark作业创建任务运行信息MaxCompute功能特性 开放存储(Storage API):数据不可直接访问,可通过Storage API访问(按量付费):官网链接Quota资源管理:分层管理、资源隔离、弹性伸缩:官网链接物化视图:根据历史作业和性能分析自动创建物化视图(AutoMV):官网链接数据安全:支持按项目、表(列级别、行级别)、资源、函数或实例维度的访问控制:官网链接近实时批流一体数仓:支持基于Flink等流计算的分钟级数据写入与秒级查询加速(MCQA2.0):官网链接任务调优:内置作业诊断与优化建议,包括了SQL调优、数据倾斜调优等:官网链接成本优化推荐:保障作业按时完成的前提下,生成更优的资源配置方案,降低成本:官网链接账单明细:支持历史作业分析、资源分账,可以详细统计任务的执行时长、资源使用、所属人等:官网链接
2025年08月21日
253 阅读
16 点赞
2025-08-02
湖仓一体流批协同实践:从Spark批量加速到Flink实时更新
在湖仓一体架构下,无论是业务库数据同步还是宽表构建,Flink虽然支持"先全量,再增量"的处理模式,但当历史数据规模达到亿级时,全量数据导入阶段存在瓶颈,如果调高并发会抢占实时集群资源,低并发会导致处理时间过长。因此,我们采用Spark+Flink的协同计算架构,首先基于Spark的离线计算优势,在离线集群完成大规模历史数据的批量导入,再启动Flink任务进行增量更新。本文将分享该方案落地过程中遇到的问题和解决方案。Spark加速全量数据入湖 配置写入方式为bulk insert,减少数据序列化以及合并操作,该数据写入方式会跳过数据去重,所以可以在hive中通过SparkSQL预处理历史数据。set hoodie.spark.sql.insert.into.operation=bulk_insert;如果历史数据在Hive中,表格式尽可能是orc或parquet格式,否则处理效率会显著下降,SparkSQL任务配置如下:executor-memory 16g num-executors 150 executor-cores 4 spark.default.parallelism 600上述配置在数据量较大的情况下容易OOM,如果存量数据过多,需要分多个批次入湖,这样耗时更长,作业更不稳定。因为数据入湖时数据分桶、写入缓冲都属于内存密集型操作,所以适当调大spark.memory.fraction,调小spark.memory.storageFraction(减小存储内存,增加执行内存),有利于加速入湖。spark.memory.fraction 0.8 spark.memory.storageFraction 0.3同时,在资源不变的情况下,适当增大并行度,低并行度时每个Task处理的数据量较大,排序操作的内存压力剧增,当单个Task的数据量超过可用内存时,Spark会触发磁盘溢写,并行度增大一倍后,每个Task处理的数据量减半,排序完全在内存中完成,避免Spill和GC,效率显著提升。spark.default.parallelism 1200最终配置如下,实现2分钟内15亿全量数据入湖:spark-sql --master yarn --queue ... --deploy-mode client --name ... \ --driver-memory 16g --driver-cores 8 --executor-memory 16g --num-executors 150 \ --conf spark.executor.heartbeatInterval=120000s \ --conf spark.network.timeout=130000s \ --conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.3 \ --executor-cores 4 --conf spark.default.parallelism=1200 \ --hiveconf hive.cli.print.header=true \ --jars hudi-spark3.2-bundle_2.12-0.14.1.jar \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' -S全量数据入湖后小文件合并 增大并行度会生成很多小文件,在全量数据导入完成后,Flink任务启动前,通过Spark提交Clustering任务进行一次小文件合并。spark-submit --master yarn --deploy-mode cluster --queue ... \ --name ... \ --driver-memory 16g --executor-memory 16g --num-executors 150 \ --executor-cores 8 --conf spark.default.parallelism=1200 \ --class org.apache.hudi.utilities.HoodieClusteringJob hudi-utilities-bundle_2.12-0.14.1.jar \ --mode scheduleAndExecute \ --base-path hdfs://... \ --table-name ... \ --retry-last-failed-clustering-job \ --job-max-processing-time-ms 1800000 \ --hoodie-conf hoodie.clean.async=true \ --hoodie-conf hoodie.clean.automatic=true \ --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS \ --hoodie-conf hoodie.cleaner.fileversions.retained=1 \ --hoodie-conf hoodie.cleaner.parallelism=100 \ --hoodie-conf hoodie.clustering.async.enabled=true \ --hoodie-conf hoodie.clustering.async.max.commits=1 \ --hoodie-conf hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy \ --hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=20000 \ --hoodie-conf hoodie.metadata.enable=false \ --hoodie-conf hoodie.clustering.plan.strategy.sort.columns=...使用KEEP_LATEST_FILE_VERSIONS清理策略,保留最后1个文件版本,避免使用成KEEP_LATEST_COMMITS,因为压缩前的文件仍关联原始提交时间,会导致合并前的小文件和合并后的大文件共存的问题。hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS hoodie.cleaner.fileversions.retained=1清理策略原理可参考:https://developer.aliyun.com/article/1457637 Flink index bootstrap加速 第一次启动实时任务需要打开索引引导函数,同时把开始消费时间回调到离线数据的截止时间,配置如下:'index.bootstrap.enabled'='true' 'scan.startup.mode'='timestamp' 'scan.startup.timestamp-millis' = ''数据量较大时,可以提高并发,同时调整下面三个配置,以确保上下游算子的并行度保持一致。否则,会导致大量不必要的网络IO,进而引发 checkpoint 长时间阻塞,最终可能导致任务启动失败。'write.tasks' = '100', 'write.bucket_assign.tasks' = '100', 'write.index_bootstrap.tasks'='100'初始化索引完成后,执行savepoint,减小任务的并行度,关闭index bootstrap,从savepoint重新启动作业。'write.tasks' = '12' 'write.bucket_assign.tasks' = '20' 'index.bootstrap.enabled'='false'由于savepoint保存了index bootstrap算子信息,关闭index bootstrap后,会导致作业无法恢复,需要通过参数配置,允许跳过无法还原的保存点状态。作业为Per-Job模式时,启动任务配置--allowNonRestoredState参数即可。flink run -d -t yarn-per-job -Dyarn.application.queue=... -Dparallelism.default=6 \ -Dtaskmanager.numberOfTaskSlots=3 -Djobmanager.memory.process.size=2048m \ -Dexecution.checkpointing.snapshot-compression=true \ -Dexecution.checkpointing.local-backup.enabled=true \ -Dexecution.state-recovery.from-local=true \ -s hdfs://.../savepoint/savepoint-f36c2e-ada1c3edc7f2 --allowNonRestoredState \ -Dtaskmanager.memory.process.size=2048m -Dtaskmanager.memory.managed.fraction=0.3 \ -Dstate.backend.rocksdb.log.size=100m -Dyarn.application.name=... -c ... ....jar \ ... -stateBackendType 2 -externalizedCheckpointCleanup RETAIN_ON_CANCELLATION -enableIncremental true作业是Flink SQL或者Application模式时,并不支持在任务启动命令上配置--allowNonRestoredState参数,设置下面参数后并不生效,作业仍然会启动失败。execution.savepoint.ignore-unclaimed-state=true如果作业需要以Application模式运行,目前最佳的方式是先通过Per-Job模式恢复作业,再次执行savepoint,把作业从该savepoint以Application模式的方式拉起即可。
2025年08月02日
678 阅读
35 点赞
2023-12-30
Flink + Hudi 流批一体作业稳定性优化
目前,任务分为两种类型:当业务逻辑较为简单时,使用 Flink SQL 进行处理,例如将原始日志或业务库同步至 Hudi 的 ODS 层、进行多表关联和聚合等操作;当业务逻辑比较复杂或需要特殊处理时,例如部分数据需要通过 API 获取,则使用 Flink DataStream API 消费 Hudi 数据,并经过一系列处理后,写入 Hudi 的 DWD、DWS 或 ADS 层。针对内存方面的问题,对Flink写入Hudi任务多个Task的Heap使用率进行了监控。注意到使用率存在较大的波动,有时甚至可能超出分配的阈值,这种情况可能导致任务被强制终止。Yarn Application报错信息: org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. Association with remote system [akka.tcp://flink] has failed, address is now Association with remote system [akka.tcp://flink] has failed,address is now gated for [50] ms NodeManager Local logs报错信息: org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=13427,containerID=container_e632_1670877056012_20807318_01_000001] is running beyond physical memory limits. Current usage: 2.0 GB of 2 GB physical memory used; 4.5 GB of 4.2 GB virtual memory used. Killing container.可以通过在yarn-site.xml中将yarn.nodemanager.vmem-check-enabled设为false,或者为jobmanager和taskmanager分配足够的内存来提高任务的稳定性。<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> 注:是否启用一个线程检查每个任务正在使用的虚拟内存量,如果任务超出了分配值,则直接将其kill,默认是true另一个问题是,Managed Memory的使用率较低,而Task Heap的使用率却相当高。在增加TaskManager的内存分配(通过-ytm参数)时,会根据taskmanager.memory.managed.fraction的设置将新增的内存分配给Managed Memory。这可能会造成资源浪费。适当减小该配置的值有助于提高整体内存利用率。taskmanager.memory.managed.fraction: 0.2使用Hudi写入可能导致节点过载,从而导致上游算子背压。在执行SQL时,可以考虑提高写入并行度。如果是DateStream API Hudi到Hudi的数据传输,由于数据分布可能不均,建议在读取后直接使用rebalance()算子进行数据均衡处理。对于当前分区文件数量问题,写入Hudi任务的并行度会直接影响文件数量,随着并行度的增加,文件数量也会相应增加。此外,对于COPY_ON_WRITE模式,以下三个参数会直接影响文件数量。任务在运行一段时间后,当合并操作和触发操作达到平衡时,文件数量会在很小范围内波动。此外,可以通过减小clustering.delta_commits来减少当前分区文件数量。clustering.delta_commits 触发合并文件所需的提交次数 clean.retain_commits 保留的提交数,不进行清理 cleaner.policy(KEEP_LATEST_COMMITS)相对于在线clustering而言,离线clustering更加稳定并且可以实现资源隔离。经过大量任务长时间运行后来看,在使用离线clustering时需要注意以下问题。常用离线clustering命令如下:nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/${bundle_jar_name}.jar \ --service \ --path ${table_path} > clustering.log & \以上离线clustering运行时,--plan-partition-filter-mode默认为NONE,所有符合条件的分区都会加入到任务当中,根据使用情况来看,随着历史分区的增加,资源使用逐渐处于较高的水平,该模式比较适用于临时处理大量滞留文件。同时,当数据量较大时还存在一个问题,当Clustering异常时,重启任务,偶尔会报错提示历史分区某parquet不存在(社区群有同学反馈过该问题)。通过配置参数--plan-partition-filter-mode RECENT_DAYS --target-partitions 2可以解决该问题。还可以使用--plan-partition-filter-mode的SELECTED_PARTITIONS模式,划定历史分区的范围,处理积压问题。Flink离线Clustering使用建议,分区过滤模式RECENT_DAYS和NONE的组合使用相对较为稳定,适用于线上环境。当历史分区中存在大量待Clustering的文件时,可以临时使用NONE模式压缩几次。等到压缩速度跟得上时,再切换回RECENT_DAYS模式进行进一步的处理,最终的离线Clustering命令:常规: nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --clustering-delta-commits 8 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --plan-partition-filter-mode RECENT_DAYS \ --target-partitions 2 \ --path hdfs://...... > clustering.log & 处理积压: flink run -c \ org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --min-clustering-interval-seconds 3 \ --max-num-groups 1000 \ --clustering-delta-commits 1 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --path ${target_path}在MERGE_ON_READ模式下,当前分区的文件数量与compaction.delta_commits、clean.retain_commits有关。但是当clean.retain_commits大于10时,可能会出现parquet和log文件无法被清理干净的情况,导致部分小文件滞留在历史分区中。HoodieFlinkClusteringJob类可以接受多个参数,可以在源码org.apache.hudi.sink.clustering.FlinkClusteringConfig中或使用如下命令查看:flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ ../flink-1.14.2/lib/hudi-flink1.14-bundle-0.12.3.jar \ -h需要注意的是Boolean类型的参数,默认都为false,如果要开启,直接使用--param_name即可,而不是--param_name true,Hudi中解析参数使用的JCommander,如下图所示,如果Boolean参数后面跟true、false、0、1,都会在遍历参数时错位,导致任务提交失败,报错提示和这种配置方式并不是很友好,很容易让使用者误认为该参数是不可配置的!通过适当增加以下两个超时参数,可以有效避免因网络波动等原因导致的超时问题,从而降低任务失败的概率。set execution.checkpointing.timeout=600000 properties.request.timeout.ms=120000(connector为kafka时)以下参数设置为true时,如果TaskManager发生了Akka错误,例如无法与JobManager通信或其他网络问题,TaskManager将会立即退出;设置为false时,TaskManager将不会主动退出,而是继续运行,这可能会导致系统处于不稳定状态,并且需要人工干预来解决问题。taskmanager.exit-on-fatal-akka-error: true改动Hudi源码或者适配低版本大数据组件,参考了如下文章,可顺利完成编译打包:https://blog.csdn.net/weixin_45417821/article/details/127407461mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 -Dflink1.14 -Dscala-2.12 mvn clean install -DskipTests -Dscala-2.12 -Pflink-bundle-shade-hive3
2023年12月30日
2,129 阅读
65 点赞
2023-07-01
Flink流批一体作业管理平台
前言随着大数据处理需求的不断增长,流批一体作业管理平台的重要性愈发凸显。本文将介绍如何针对 Flink + Hudi 流批一体任务进行管理,特别针对 Hudi 任务的需求,支持一键启动离线 Compaction 和 Clustering 任务,保证数据湖的稳定运行。背景在大数据处理领域,简化作业的提交、监控和管理是一项重要的任务。调研过程中发现,现有的开源系统如 Dinky 和 StreamPark 在任务生成和监控方面存在一定局限性。它们使用了 Flink 或 YARN 源码中的 API,如 org.apache.hadoop.yarn.client.api.YarnClient 、 org.apache.hadoop.hdfs.DistributedFileSystem 等,导致系统与 Hadoop 集群和 Flink 客户端版本之间耦合度高,Hadoop集群使用权限要求较高。系统开发基于项目 flink-streaming-platform-web 进行开发,通过优化和功能扩展(如:对使用FlinkSQL操作Hudi表的支持),构建了一个无侵入性、与 Hadoop 集群和 Flink 客户端版本解耦的流批一体作业调度平台。只需在装有 Flink 客户端的机器上启动系统,即可轻松使用,所有任务均使用 Flink 自带的命令进行提交,如:# 提交 Compaction 任务 flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor ... # 提交 Clustering 任务 flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob ...核心功能流任务管理支持 FlinkSQL 和 JAR 提交任务,提供流式作业的提交、监控、告警和日志查看等功能,为用户提供全方位的作业管理服务。批任务调度支持 FlinkSQL 和 JAR 提交任务,可定时调度批处理作业,定时完成数据处理任务,提高数据处理效率。数据湖管理(Hudi)特别针对 Hudi 任务的需求,系统支持对 MOR(Merge On Read)和 COW(Copy On Write)模式下的离线 Compaction 和离线 Clustering 任务进行管理,保证数据湖的稳定运行和数据质量。架构设计该平台的架构设计是其高效运行的关键,采用了模块化设计,将流任务管理、批任务调度和数据湖管理等功能模块化,实现了高度的灵活性和可扩展性。每个模块都具有清晰的职责和接口,使得系统易于维护和扩展。流任务管理模块负责接收用户提交的流式作业,并将其转换为 Flink 任务进行执行。这个模块需要实现任务的监控和告警功能,以确保作业的稳定运行。采用了分布式监控系统来实现实时监控和告警,保障了系统的高可用性和可靠性。批任务调度模块则负责定时调度批处理作业,并在预定的时间点执行数据处理任务。这个模块需要考虑到作业的依赖关系和执行顺序,以确保数据处理任务按时完成。采用了依赖调度策略来解决这个问题,有效地提高了作业的执行效率。数据湖管理模块是该平台的重要组成部分,特别针对 Hudi 任务的需求进行了优化。实现了对 MOR 和 COW 模式下的离线 Compaction 和离线 Clustering 任务的管理,确保了数据湖的稳定运行和数据质量。该平台极大地提升了流批一体作业创建的效率和灵活性,提供了更便捷、可靠的作业管理解决方案。未来,将持续优化系统功能,以满足需求,并助力流批一体作业的稳定运行和管理。
2023年07月01日
1,473 阅读
91 点赞
1
2
...
6