首页
Search
1
JAVA垃圾回收
993 阅读
2
Kafka、RocketMQ消息队列总结
981 阅读
3
Flink on Kubernetes 计算和存储分离落地实践
969 阅读
4
Linux免密登陆-ubuntu
904 阅读
5
Redis集群部署方案
890 阅读
大数据
Flink
后端
Java
笔记
运维
游客
Search
标签搜索
大数据
Flink
离线
实时
Redis
OpenJDK
Java
笔记
JVM
Elasticsearch
GC
Hadoop
Hudi
Flink CDC
K8S
数据湖
TOTC
累计撰写
307
篇文章
累计阅读
104.3万
次
首页
栏目
大数据
Flink
后端
Java
笔记
运维
页面
搜索到
7
篇与
大数据
的结果
返回首页
2023-12-30
Flink + Hudi 流批一体作业稳定性优化
目前,任务分为两种类型:当业务逻辑较为简单时,使用 Flink SQL 进行处理,例如将原始日志或业务库同步至 Hudi 的 ODS 层、进行多表关联和聚合等操作;当业务逻辑比较复杂或需要特殊处理时,例如部分数据需要通过 API 获取,则使用 Flink DataStream API 消费 Hudi 数据,并经过一系列处理后,写入 Hudi 的 DWD、DWS 或 ADS 层。针对内存方面的问题,对Flink写入Hudi任务多个Task的Heap使用率进行了监控。注意到使用率存在较大的波动,有时甚至可能超出分配的阈值,这种情况可能导致任务被强制终止。Yarn Application报错信息: org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. Association with remote system [akka.tcp://flink] has failed, address is now Association with remote system [akka.tcp://flink] has failed,address is now gated for [50] ms NodeManager Local logs报错信息: org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=13427,containerID=container_e632_1670877056012_20807318_01_000001] is running beyond physical memory limits. Current usage: 2.0 GB of 2 GB physical memory used; 4.5 GB of 4.2 GB virtual memory used. Killing container.可以通过在yarn-site.xml中将yarn.nodemanager.vmem-check-enabled设为false,或者为jobmanager和taskmanager分配足够的内存来提高任务的稳定性。<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> 注:是否启用一个线程检查每个任务正在使用的虚拟内存量,如果任务超出了分配值,则直接将其kill,默认是true另一个问题是,Managed Memory的使用率较低,而Task Heap的使用率却相当高。在增加TaskManager的内存分配(通过-ytm参数)时,会根据taskmanager.memory.managed.fraction的设置将新增的内存分配给Managed Memory。这可能会造成资源浪费。适当减小该配置的值有助于提高整体内存利用率。taskmanager.memory.managed.fraction: 0.2使用Hudi写入可能导致节点过载,从而导致上游算子背压。在执行SQL时,可以考虑提高写入并行度。如果是DateStream API Hudi到Hudi的数据传输,由于数据分布可能不均,建议在读取后直接使用rebalance()算子进行数据均衡处理。对于当前分区文件数量问题,写入Hudi任务的并行度会直接影响文件数量,随着并行度的增加,文件数量也会相应增加。此外,对于COPY_ON_WRITE模式,以下三个参数会直接影响文件数量。任务在运行一段时间后,当合并操作和触发操作达到平衡时,文件数量会在很小范围内波动。此外,可以通过减小clustering.delta_commits来减少当前分区文件数量。clustering.delta_commits 触发合并文件所需的提交次数 clean.retain_commits 保留的提交数,不进行清理 cleaner.policy(KEEP_LATEST_COMMITS)相对于在线clustering而言,离线clustering更加稳定并且可以实现资源隔离。经过大量任务长时间运行后来看,在使用离线clustering时需要注意以下问题。常用离线clustering命令如下:nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/${bundle_jar_name}.jar \ --service \ --path ${table_path} > clustering.log & \以上离线clustering运行时,--plan-partition-filter-mode默认为NONE,所有符合条件的分区都会加入到任务当中,根据使用情况来看,随着历史分区的增加,资源使用逐渐处于较高的水平,该模式比较适用于临时处理大量滞留文件。同时,当数据量较大时还存在一个问题,当Clustering异常时,重启任务,偶尔会报错提示历史分区某parquet不存在(社区群有同学反馈过该问题)。通过配置参数--plan-partition-filter-mode RECENT_DAYS --target-partitions 2可以解决该问题。还可以使用--plan-partition-filter-mode的SELECTED_PARTITIONS模式,划定历史分区的范围,处理积压问题。Flink离线Clustering使用建议,分区过滤模式RECENT_DAYS和NONE的组合使用相对较为稳定,适用于线上环境。当历史分区中存在大量待Clustering的文件时,可以临时使用NONE模式压缩几次。等到压缩速度跟得上时,再切换回RECENT_DAYS模式进行进一步的处理,最终的离线Clustering命令:常规: nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --clustering-delta-commits 8 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --plan-partition-filter-mode RECENT_DAYS \ --target-partitions 2 \ --path hdfs://...... > clustering.log & 处理积压: flink run -c \ org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --min-clustering-interval-seconds 3 \ --max-num-groups 1000 \ --clustering-delta-commits 1 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --path ${target_path}在MERGE_ON_READ模式下,当前分区的文件数量与compaction.delta_commits、clean.retain_commits有关。但是当clean.retain_commits大于10时,可能会出现parquet和log文件无法被清理干净的情况,导致部分小文件滞留在历史分区中。HoodieFlinkClusteringJob类可以接受多个参数,可以在源码org.apache.hudi.sink.clustering.FlinkClusteringConfig中或使用如下命令查看:flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ ../flink-1.14.2/lib/hudi-flink1.14-bundle-0.12.3.jar \ -h需要注意的是Boolean类型的参数,默认都为false,如果要开启,直接使用--param_name即可,而不是--param_name true,Hudi中解析参数使用的JCommander,如下图所示,如果Boolean参数后面跟true、false、0、1,都会在遍历参数时错位,导致任务提交失败,报错提示和这种配置方式并不是很友好,很容易让使用者误认为该参数是不可配置的!通过适当增加以下两个超时参数,可以有效避免因网络波动等原因导致的超时问题,从而降低任务失败的概率。set execution.checkpointing.timeout=600000 properties.request.timeout.ms=120000(connector为kafka时)以下参数设置为true时,如果TaskManager发生了Akka错误,例如无法与JobManager通信或其他网络问题,TaskManager将会立即退出;设置为false时,TaskManager将不会主动退出,而是继续运行,这可能会导致系统处于不稳定状态,并且需要人工干预来解决问题。taskmanager.exit-on-fatal-akka-error: true改动Hudi源码或者适配低版本大数据组件,参考了如下文章,可顺利完成编译打包:https://blog.csdn.net/weixin_45417821/article/details/127407461mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 -Dflink1.14 -Dscala-2.12 mvn clean install -DskipTests -Dscala-2.12 -Pflink-bundle-shade-hive3
2023年12月30日
1,946 阅读
15 点赞
2023-07-01
Flink流批一体作业管理平台
前言随着大数据处理需求的不断增长,流批一体作业管理平台的重要性愈发凸显。本文将介绍如何针对 Flink + Hudi 流批一体任务进行管理,特别针对 Hudi 任务的需求,支持一键启动离线 Compaction 和 Clustering 任务,保证数据湖的稳定运行。背景在大数据处理领域,简化作业的提交、监控和管理是一项重要的任务。调研过程中发现,现有的开源系统如 Dinky 和 StreamPark 在任务生成和监控方面存在一定局限性。它们使用了 Flink 或 YARN 源码中的 API,如 org.apache.hadoop.yarn.client.api.YarnClient 、 org.apache.hadoop.hdfs.DistributedFileSystem 等,导致系统与 Hadoop 集群和 Flink 客户端版本之间耦合度高,Hadoop集群使用权限要求较高。系统开发基于项目 flink-streaming-platform-web 进行开发,通过优化和功能扩展(如:对使用FlinkSQL操作Hudi表的支持),构建了一个无侵入性、与 Hadoop 集群和 Flink 客户端版本解耦的流批一体作业调度平台。只需在装有 Flink 客户端的机器上启动系统,即可轻松使用,所有任务均使用 Flink 自带的命令进行提交,如:# 提交 Compaction 任务 flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor ... # 提交 Clustering 任务 flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob ...核心功能流任务管理支持 FlinkSQL 和 JAR 提交任务,提供流式作业的提交、监控、告警和日志查看等功能,为用户提供全方位的作业管理服务。批任务调度支持 FlinkSQL 和 JAR 提交任务,可定时调度批处理作业,定时完成数据处理任务,提高数据处理效率。数据湖管理(Hudi)特别针对 Hudi 任务的需求,系统支持对 MOR(Merge On Read)和 COW(Copy On Write)模式下的离线 Compaction 和离线 Clustering 任务进行管理,保证数据湖的稳定运行和数据质量。架构设计该平台的架构设计是其高效运行的关键,采用了模块化设计,将流任务管理、批任务调度和数据湖管理等功能模块化,实现了高度的灵活性和可扩展性。每个模块都具有清晰的职责和接口,使得系统易于维护和扩展。流任务管理模块负责接收用户提交的流式作业,并将其转换为 Flink 任务进行执行。这个模块需要实现任务的监控和告警功能,以确保作业的稳定运行。采用了分布式监控系统来实现实时监控和告警,保障了系统的高可用性和可靠性。批任务调度模块则负责定时调度批处理作业,并在预定的时间点执行数据处理任务。这个模块需要考虑到作业的依赖关系和执行顺序,以确保数据处理任务按时完成。采用了依赖调度策略来解决这个问题,有效地提高了作业的执行效率。数据湖管理模块是该平台的重要组成部分,特别针对 Hudi 任务的需求进行了优化。实现了对 MOR 和 COW 模式下的离线 Compaction 和离线 Clustering 任务的管理,确保了数据湖的稳定运行和数据质量。该平台极大地提升了流批一体作业创建的效率和灵活性,提供了更便捷、可靠的作业管理解决方案。未来,将持续优化系统功能,以满足需求,并助力流批一体作业的稳定运行和管理。
2023年07月01日
1,359 阅读
91 点赞
2022-12-03
Flink 任务执行流程源码解析
用户提交Flink任务时,通过先后调用transform()——>doTransform()——>addOperator()方法,将map、flatMap、filter、process等算子添加到List<Transformation<?>> transformations集合中。在执行execute()方法时,会使用StreamGraphGenerator的generate()方法构建流拓扑StreamGraph(即Pipeline),数据结构属于有向无环图。在StreamGraph中,StreamNode用于记录算子信息,而StreamEdge则用于记录数据交换方式,包括以下几种Partitioner:{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}Partitioner类都是StreamPartitioner类的子类,它们通过实现isPointwise()方法来确定自身的类型。一种是ALL_TO_ALL,另一个种是POINTWISE。/** * A distribution pattern determines, which sub tasks of a producing task are connected to which * consuming sub tasks. * * <p>It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} are connected * in {@link EdgeManagerBuildUtil} */ public enum DistributionPattern { /** Each producing sub task is connected to each sub task of the consuming task. */ ALL_TO_ALL, /** Each producing sub task is connected to one or more subtask(s) of the consuming task. */ POINTWISE }ALL_TO_ALL意味着上游的每个subtask需要与下游的每个subtask建立连接。{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}POINTWISE则是上游的每个subtask和下游的一个或多个subtask连接。{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}StreamGraph构建完成后,,会通过 PipelineExecutorUtils.getJobGraph()构建JobGraph,具体流程是:——>PipelineExecutorUtils.getJobGraph() ——>FlinkPipelineTranslationUtil.getJobGraph() ——>StreamGraphTranslator.translateToJobGraph() ——>StreamGraph.getJobGraph() ——>StreamingJobGraphGenerator.createJobGraph()JobGraph是优化后的StreamGraph,如果相连的算子支持chaining,合并到一个StreamNode,chaining在StreamingJobGraphGenerator的setChaining()方法中实现:/** * Sets up task chains from the source {@link StreamNode} instances. * * <p>This will recursively create all {@link JobVertex} instances. */ private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) { // we separate out the sources that run as inputs to another operator (chained inputs) // from the sources that needs to run as the main (head) operator. final Map<Integer, OperatorChainInfo> chainEntryPoints = buildChainedInputsAndGetHeadInputs(hashes, legacyHashes); final Collection<OperatorChainInfo> initialEntryPoints = chainEntryPoints.entrySet().stream() .sorted(Comparator.comparing(Map.Entry::getKey)) .map(Map.Entry::getValue) .collect(Collectors.toList()); // iterate over a copy of the values, because this map gets concurrently modified for (OperatorChainInfo info : initialEntryPoints) { createChain( info.getStartNodeId(), 1, // operators start at position 1 because 0 is for chained source inputs info, chainEntryPoints); } }将符合chaining条件的,合并到一个StreamNode条件如下:1. 下游节点输入边只有一个 2. 与下游属于同一个SlotSharingGroup 3. 数据分发策略Forward 4. 流数据交换模式不是批量模式 5. 上下游并行度相等 6. StreamGraph中chaining为true streamGraph 是可以 chain的 7. 算子是否可以链化areOperatorsChainable代码如下:public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph); } private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) && (edge.getPartitioner() instanceof ForwardPartitioner) && edge.getExchangeMode() != StreamExchangeMode.BATCH && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled())) { return false; } ... ... 从Source节点开始,使用深度优先搜索(DFS)算法递归遍历有向无环图中的所有StreamNode节点。{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}待续 ... ...
2022年12月03日
1,316 阅读
31 点赞
2021-11-20
Flink实时计算问题记录与解决方案
当Flink任务的并行度大于Kafka分区数时,可能会导致部分并行度空闲,进而影响水位线(watermark)的生成。为了解决这个问题,可以通过设置withIdleness来进行调整:WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withIdleness(Duration.ofSeconds(60))对于withIdleness参数,应避免将下游任务设置得太小。原因在于,如果上游任务因故障停止,而其恢复所需时间超过了下游任务设置的withIdleness值,那么下游任务会将超时的分区标记为不再消费,导致数据丢失。为避免此问题,建议将分区数设置为不小于任务的并行度,并不设置withIdleness参数,这样可以有效防止潜在的数据丢失情况。{lamp/}kafkaSource指定时间戳消费时,必须为毫秒时间戳,Flink 1.14官网文档为秒,是错误的,指定后不会生效。setStartingOffsets(OffsetsInitializer.timestamp(1654703973000L)){lamp/}要实现Flink与Kafka的端到端一致性,需要确保Kafka的版本不低于2.5。要注意的是,Flink 1.14.2中flink-connector所包含的kafka-clients版本是2.4.X。because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.Flink-Kafka端到端一致性需要设置TRANSACTIONAL_ID_CONFIG = "transactional.id",如果不设置,从checkpoint重启会报错:OutOfOrderSequenceException: The broker received an out of order sequence number。{lamp/}Flink CDC同步mysql时,需要把binlog配置成ROW模式,查看命令和配置方法如下:show variables like 'binlog_format%'; vi /etc/my.cnf binlog_format=row systemctl restart mariadb.service非ROW模式时会报以下错误:Caused by: org.apache.flink.table.api.ValidationException: The MySQL server is configured with binlog_format MIXED rather than ROW, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_format=ROW and restart the connector.Flink 1.14.2版本使用CDC 2.2,需要编译CDC源码进行版本适配:1.pom文件中修改flink版本为1.14.2、scala版本为2.12.72.修改flink-table-planner-blink为flink-table-planner;flink-table-runtime-blink为flink-table-runtime3.flink-shaded-guava版本由30.1.1-jre-14.0修改为18.0-13.0修改完成后,会出现部分import报错的情况。需要根据新依赖版本中的路径和类进行相应的修改,例如,将创建TimestampFormat的代码修改为:TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions)。在编译过程中,首先需要使用install命令对父module进行安装。这样可以确保本地Maven仓库中包含各个子module的JAR包。对子module进行打包时,如Flink MySQL CDC,可以在子module的POM文件中修改打包方式,将所有依赖项都打包到一个JAR文件中,这样在工程中只需引入一个<dependency>即可。否则,会因为缺少某些依赖报错,如:Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
2021年11月20日
1,630 阅读
10 点赞
2018-09-08
Hadoop各版本汇总
Hadoop1.0 NameNode节点有且只有一个,虽然可以通过SecondaryNameNode进行主节点数据备份,但是存在延时情况,假如主节点挂掉,这时部分数据还未同步到SecondaryNameNode节点上,就会存在资源数据的缺失。因为NameNode是存储着DataNode节点等元数据信息。对于MapReduce,也是一个简单的主从结构,是有一个主JobTracker和多个从的TaskTracker组成,而且在hadoop1.0中JobTracker任务繁重。{lamp/}Hadoop2.0 增加了HDFS Federation(联邦)水平扩展,支持多个namenode同时运行,每一个namenode分管一批目录,然后共享所有datanode的存储资源,从而解决1.0当中单个namenode节点内存受限问题。HDFS的Federation,多个namenode(多个namespace),互相独立,互相协调,各自分工管理自己的区域,并不能解决单点故障问题,配合HA,每个namenode部署一个备机。增加了HDFS HA机制,解决了1.0中的单点故障问题,只支持两个节点,3.0实现了一主多从。增加了YARN框架,针对1.0中主JobTracker压力太大的不足,把JobTracker资源分配和作业控制分开,利用Resource Manager在namenode上进行资源管理调度,利用ApplicationMaster进行任务管理和任务监控。由NodeManager替代TaskTracker进行具体任务的执行,因此MapReduce2.0只是一个计算框架。对比1.0中相关资源的调用全部给Yarn框架管理。{lamp/}Hadoop3.0 Javaj运行环境升级为1.8,对之前低版本的Java不在支持。HDFS3.0支持数据的擦除编码,调高存储空间的使用率。一些默认端口的改变。增加一些MapReduce的调优。支持 2 个以上的 NameNode,例如,通过配置三个 NameNode 和五个 JournalNode,集群能够容忍两个节点而不是一个节点的故障。https://hadoop.apache.org/docs/r3.0.0
2018年09月08日
860 阅读
5 点赞
1
2