首页
Search
1
JAVA垃圾回收
993 阅读
2
Kafka、RocketMQ消息队列总结
981 阅读
3
Flink on Kubernetes 计算和存储分离落地实践
969 阅读
4
Linux免密登陆-ubuntu
904 阅读
5
Redis集群部署方案
890 阅读
大数据
Flink
后端
Java
笔记
运维
游客
Search
标签搜索
大数据
Flink
离线
实时
Redis
OpenJDK
Java
笔记
JVM
Elasticsearch
GC
Hadoop
Hudi
Flink CDC
K8S
数据湖
TOTC
累计撰写
307
篇文章
累计阅读
104.3万
次
首页
栏目
大数据
Flink
后端
Java
笔记
运维
页面
搜索到
5
篇与
Flink
的结果
返回首页
2023-12-30
Flink + Hudi 流批一体作业稳定性优化
目前,任务分为两种类型:当业务逻辑较为简单时,使用 Flink SQL 进行处理,例如将原始日志或业务库同步至 Hudi 的 ODS 层、进行多表关联和聚合等操作;当业务逻辑比较复杂或需要特殊处理时,例如部分数据需要通过 API 获取,则使用 Flink DataStream API 消费 Hudi 数据,并经过一系列处理后,写入 Hudi 的 DWD、DWS 或 ADS 层。针对内存方面的问题,对Flink写入Hudi任务多个Task的Heap使用率进行了监控。注意到使用率存在较大的波动,有时甚至可能超出分配的阈值,这种情况可能导致任务被强制终止。Yarn Application报错信息: org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. Association with remote system [akka.tcp://flink] has failed, address is now Association with remote system [akka.tcp://flink] has failed,address is now gated for [50] ms NodeManager Local logs报错信息: org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=13427,containerID=container_e632_1670877056012_20807318_01_000001] is running beyond physical memory limits. Current usage: 2.0 GB of 2 GB physical memory used; 4.5 GB of 4.2 GB virtual memory used. Killing container.可以通过在yarn-site.xml中将yarn.nodemanager.vmem-check-enabled设为false,或者为jobmanager和taskmanager分配足够的内存来提高任务的稳定性。<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> 注:是否启用一个线程检查每个任务正在使用的虚拟内存量,如果任务超出了分配值,则直接将其kill,默认是true另一个问题是,Managed Memory的使用率较低,而Task Heap的使用率却相当高。在增加TaskManager的内存分配(通过-ytm参数)时,会根据taskmanager.memory.managed.fraction的设置将新增的内存分配给Managed Memory。这可能会造成资源浪费。适当减小该配置的值有助于提高整体内存利用率。taskmanager.memory.managed.fraction: 0.2使用Hudi写入可能导致节点过载,从而导致上游算子背压。在执行SQL时,可以考虑提高写入并行度。如果是DateStream API Hudi到Hudi的数据传输,由于数据分布可能不均,建议在读取后直接使用rebalance()算子进行数据均衡处理。对于当前分区文件数量问题,写入Hudi任务的并行度会直接影响文件数量,随着并行度的增加,文件数量也会相应增加。此外,对于COPY_ON_WRITE模式,以下三个参数会直接影响文件数量。任务在运行一段时间后,当合并操作和触发操作达到平衡时,文件数量会在很小范围内波动。此外,可以通过减小clustering.delta_commits来减少当前分区文件数量。clustering.delta_commits 触发合并文件所需的提交次数 clean.retain_commits 保留的提交数,不进行清理 cleaner.policy(KEEP_LATEST_COMMITS)相对于在线clustering而言,离线clustering更加稳定并且可以实现资源隔离。经过大量任务长时间运行后来看,在使用离线clustering时需要注意以下问题。常用离线clustering命令如下:nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/${bundle_jar_name}.jar \ --service \ --path ${table_path} > clustering.log & \以上离线clustering运行时,--plan-partition-filter-mode默认为NONE,所有符合条件的分区都会加入到任务当中,根据使用情况来看,随着历史分区的增加,资源使用逐渐处于较高的水平,该模式比较适用于临时处理大量滞留文件。同时,当数据量较大时还存在一个问题,当Clustering异常时,重启任务,偶尔会报错提示历史分区某parquet不存在(社区群有同学反馈过该问题)。通过配置参数--plan-partition-filter-mode RECENT_DAYS --target-partitions 2可以解决该问题。还可以使用--plan-partition-filter-mode的SELECTED_PARTITIONS模式,划定历史分区的范围,处理积压问题。Flink离线Clustering使用建议,分区过滤模式RECENT_DAYS和NONE的组合使用相对较为稳定,适用于线上环境。当历史分区中存在大量待Clustering的文件时,可以临时使用NONE模式压缩几次。等到压缩速度跟得上时,再切换回RECENT_DAYS模式进行进一步的处理,最终的离线Clustering命令:常规: nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --clustering-delta-commits 8 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --plan-partition-filter-mode RECENT_DAYS \ --target-partitions 2 \ --path hdfs://...... > clustering.log & 处理积压: flink run -c \ org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --min-clustering-interval-seconds 3 \ --max-num-groups 1000 \ --clustering-delta-commits 1 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --path ${target_path}在MERGE_ON_READ模式下,当前分区的文件数量与compaction.delta_commits、clean.retain_commits有关。但是当clean.retain_commits大于10时,可能会出现parquet和log文件无法被清理干净的情况,导致部分小文件滞留在历史分区中。HoodieFlinkClusteringJob类可以接受多个参数,可以在源码org.apache.hudi.sink.clustering.FlinkClusteringConfig中或使用如下命令查看:flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ ../flink-1.14.2/lib/hudi-flink1.14-bundle-0.12.3.jar \ -h需要注意的是Boolean类型的参数,默认都为false,如果要开启,直接使用--param_name即可,而不是--param_name true,Hudi中解析参数使用的JCommander,如下图所示,如果Boolean参数后面跟true、false、0、1,都会在遍历参数时错位,导致任务提交失败,报错提示和这种配置方式并不是很友好,很容易让使用者误认为该参数是不可配置的!通过适当增加以下两个超时参数,可以有效避免因网络波动等原因导致的超时问题,从而降低任务失败的概率。set execution.checkpointing.timeout=600000 properties.request.timeout.ms=120000(connector为kafka时)以下参数设置为true时,如果TaskManager发生了Akka错误,例如无法与JobManager通信或其他网络问题,TaskManager将会立即退出;设置为false时,TaskManager将不会主动退出,而是继续运行,这可能会导致系统处于不稳定状态,并且需要人工干预来解决问题。taskmanager.exit-on-fatal-akka-error: true改动Hudi源码或者适配低版本大数据组件,参考了如下文章,可顺利完成编译打包:https://blog.csdn.net/weixin_45417821/article/details/127407461mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 -Dflink1.14 -Dscala-2.12 mvn clean install -DskipTests -Dscala-2.12 -Pflink-bundle-shade-hive3
2023年12月30日
1,946 阅读
15 点赞
2023-07-01
Flink流批一体作业管理平台
前言随着大数据处理需求的不断增长,流批一体作业管理平台的重要性愈发凸显。本文将介绍如何针对 Flink + Hudi 流批一体任务进行管理,特别针对 Hudi 任务的需求,支持一键启动离线 Compaction 和 Clustering 任务,保证数据湖的稳定运行。背景在大数据处理领域,简化作业的提交、监控和管理是一项重要的任务。调研过程中发现,现有的开源系统如 Dinky 和 StreamPark 在任务生成和监控方面存在一定局限性。它们使用了 Flink 或 YARN 源码中的 API,如 org.apache.hadoop.yarn.client.api.YarnClient 、 org.apache.hadoop.hdfs.DistributedFileSystem 等,导致系统与 Hadoop 集群和 Flink 客户端版本之间耦合度高,Hadoop集群使用权限要求较高。系统开发基于项目 flink-streaming-platform-web 进行开发,通过优化和功能扩展(如:对使用FlinkSQL操作Hudi表的支持),构建了一个无侵入性、与 Hadoop 集群和 Flink 客户端版本解耦的流批一体作业调度平台。只需在装有 Flink 客户端的机器上启动系统,即可轻松使用,所有任务均使用 Flink 自带的命令进行提交,如:# 提交 Compaction 任务 flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor ... # 提交 Clustering 任务 flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob ...核心功能流任务管理支持 FlinkSQL 和 JAR 提交任务,提供流式作业的提交、监控、告警和日志查看等功能,为用户提供全方位的作业管理服务。批任务调度支持 FlinkSQL 和 JAR 提交任务,可定时调度批处理作业,定时完成数据处理任务,提高数据处理效率。数据湖管理(Hudi)特别针对 Hudi 任务的需求,系统支持对 MOR(Merge On Read)和 COW(Copy On Write)模式下的离线 Compaction 和离线 Clustering 任务进行管理,保证数据湖的稳定运行和数据质量。架构设计该平台的架构设计是其高效运行的关键,采用了模块化设计,将流任务管理、批任务调度和数据湖管理等功能模块化,实现了高度的灵活性和可扩展性。每个模块都具有清晰的职责和接口,使得系统易于维护和扩展。流任务管理模块负责接收用户提交的流式作业,并将其转换为 Flink 任务进行执行。这个模块需要实现任务的监控和告警功能,以确保作业的稳定运行。采用了分布式监控系统来实现实时监控和告警,保障了系统的高可用性和可靠性。批任务调度模块则负责定时调度批处理作业,并在预定的时间点执行数据处理任务。这个模块需要考虑到作业的依赖关系和执行顺序,以确保数据处理任务按时完成。采用了依赖调度策略来解决这个问题,有效地提高了作业的执行效率。数据湖管理模块是该平台的重要组成部分,特别针对 Hudi 任务的需求进行了优化。实现了对 MOR 和 COW 模式下的离线 Compaction 和离线 Clustering 任务的管理,确保了数据湖的稳定运行和数据质量。该平台极大地提升了流批一体作业创建的效率和灵活性,提供了更便捷、可靠的作业管理解决方案。未来,将持续优化系统功能,以满足需求,并助力流批一体作业的稳定运行和管理。
2023年07月01日
1,359 阅读
91 点赞
2023-03-25
Flink on Kubernetes 计算和存储分离落地实践
云原生已成为业界的主要趋势之一。将Flink从Yarn迁移到Kubernetes平台带来了许多优势。在这种架构下,将计算和存储解耦,计算部分运行在Kubernetes上,而存储则使用HDFS等分布式存储系统。这样的架构优势在于可以根据实际情况独立调整计算和存储资源,从而提高整体的效率和弹性。本文将介绍四种Flink在Kubernetes上的部署模式。其中,两种是基于Native Kubernetes部署的,分别有Session模式和Application模式。另外两种是基于Flink Kubernetes Operator部署的,同样包括Session模式和Application模式。首先介绍基于Flink Kubernetes Operator部署的Application模式。如果要运行自己编写的jar包,需要先构建一个镜像。如果使用了HDFS、Hudi等其他组件,还需要在Dockerfile中将Hadoop客户端和配置文件复制到镜像中,并设置相应的环境变量。同时,将所有依赖的jar包复制到Flink Home的lib目录下。FROM flink:1.16.1-scala_2.12 USER root RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone ADD hadoop-3.1.1.tar.gz /opt ADD jdk1.8.0_121.tar.gz /opt RUN rm /opt/hadoop-3.1.1/share/hadoop/common/lib/commons-math3-3.1.1.jar RUN rm /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/commons-math3-3.1.1.jar COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/common/lib/ COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/ RUN chmod -R 777 /opt/hadoop-3.1.1/share/hadoop/common/lib/ RUN mkdir -p /opt/hadoop/conf/ COPY yarn-site.xml /opt/hadoop/conf/ COPY core-site.xml /opt/hadoop/conf/ COPY hdfs-site.xml /opt/hadoop/conf/ COPY flink-shaded-hadoop-3-uber-3.1.1.7.0.3.0-79-7.0.jar $FLINK_HOME/lib/ COPY commons-cli-1.5.0.jar $FLINK_HOME/lib/ RUN mkdir $FLINK_HOME/mylib COPY xxx-1.0-SNAPSHOT.jar $FLINK_HOME/mylib RUN chown -R flink:flink $FLINK_HOME/mylib RUN echo 'export JAVA_HOME=/opt/jdk1.8.0_121 \n\ export HADOOP_HOME=/opt/hadoop-3.1.1 \n\ PATH=$PATH:$JAVA_HOME/bin \n\ PATH=$PATH:$HADOOP_HOME/bin'\ >> ~/.bashrc EXPOSE 8081构建镜像,在Dockerfile所在的目录中执行以下命令,确保该目录包含用于构建镜像的文件。docker build -t flink-native/flink-on-k8s-xxxx .安装helmcurl https://baltocdn.com/helm/signing.asc | sudo apt-key add -sudo apt-get install apt-transport-https --yes echo "deb https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list sudo apt-get update sudo apt-get install helm安装cert-manager组件,由它提供证书服务。kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml安装Flink Kubernetes Operatorhelm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace flink --create-namespace执行上述命令后,将会从ghcr.io/apache/flink-kubernetes-operator:7fc23a1镜像仓库拉取镜像。由于下载速度较慢,可以尝试从apache/flink-kubernetes-operator:7fc23a1仓库拉取镜像,然后为其添加标签docker tag apache/flink-kubernetes-operator:7fc23a1 ghcr.io/apache/flink-kubernetes-operator:7fc23a1如果在重新安装时遇到使用kubectl delete无法删除的情况,可以尝试以下命令来实现删除操作:kubectl patch crd/flinksessionjobs.flink.apache.org -p '{"metadata":{"finalizers":[]}}' --type=merge通过执行上述命令,可以成功删除该资源。查看自定义资源 kubectl get customresourcedefinition构建一个YAML文件来提交任务。其中,image指定了镜像,jarURI指定了jar包在镜像中的位置,entryClass指定了要执行的类,args指定了该类所需的参数:kind: FlinkDeployment metadata: name: flink-application-xxx spec: image: flink-native/flink-on-k8s-xxxx flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "n" serviceAccount: flink jobManager: resource: memory: "nm" cpu: n taskManager: resource: memory: "nm" cpu: n job: jarURI: local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar entryClass: com.xxx.run.XXXJob parallelism: n upgradeMode: stateless args: ["hdfs://host:9000/data/input","hdfs://host:9000/data/output","n"]提交任务kubectl create -f xxx.yaml查看flinkdeployment kubectl get flinkdeployment查看日志kubectl logs -f deploy/flink-application-xxx{lamp/}Flink on K8S Session模式和Application模式需要安装Flink客户端,下载flink压缩包,解压即可设置命名空间首选项、赋权等kubectl create ns flink-native kubectl config set-context --current --namespace=flink-native kubectl create serviceaccount flink kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=cluster-admin --serviceaccount=flink-native:flink --namespace=flink-nativeSession模式,启动Flink集群bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=xxx\ -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \ -Dkubernetes.namespace=flink-native\ -Dkubernetes.service-account=flink \ -Dclassloader.check-leaked-classloader=false \ -Dkubernetes.rest-service.exposed.type=ClusterIP \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dresourcemanager.taskmanager-timeout=60000 \ -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"端口转发nohup kubectl -n flink-native port-forward --address 0.0.0.0 service/my-first-flink-cluster-rest 8081:8081 >port-forward.log &打开Flink Web UI,可以看到此时只有jobmanager向集群提交任务,运行测试任务bin/flink run -e kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=my-first-flink-cluster examples/streaming/TopSpeedWindowing.jar运行自己的jar包bin/flink run -e kubernetes-session \ -Dkubernetes.namespace=flink-native \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dkubernetes.cluster-id=xxx \ -c com.xxx.run.XXXJob \ mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2查看pod,此时可以看到生成了taskmanagerkubectl get pod -o wide -A查看日志,使用以下命令可以看到测试程序TopSpeedWindowing的输出结果kubectl logs my-first-flink-cluster-taskmanager-1-1 -n flink-native查看任务列表:bin/flink list --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxx 根据ID删除任务:bin/flink cancel --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxxxr 3ff3c5a5e3c2f47e024e2771dc108f77Application模式bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=xxx\ -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \ -Dkubernetes.namespace=flink-native\ -Dkubernetes.service-account=flink \ -Dclassloader.check-leaked-classloader=false \ -Dkubernetes.rest-service.exposed.type=ClusterIP \ -c com.sohu.longuserprofile.run.TestJob \ local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2Session模式只能提交本地(宿主机)jar包,Application模式只能使用local:///{lamp/}常用命令k8s web ui 登录token获取 kubectl -n kubernetes-dashboard describe secret $(kubectl -n kubernetes-dashboard get secret | grep dashboard-admin | awk '{print $1}') | grep token: 查看所有pod列表 kubectl get pod -o wide -A 查看pod详细信息 kubectl describe pod pod_name -n flink-native 删除deployment kubectl delete deployment/my-first-flink-cluster 进入pod kubectl exec -it -n flink-native pod_name /bin/bash 获得所有命名空间 kubectl get namespace 拷贝出来 kubectl cp -n application psqls-0:/var/lib/postgresql/data/pg_wal /home 拷贝进去 kubectl cp /home/dades/pg_wal -n application psqls-0:/var/lib/postgresql/data/pg_wal
2023年03月25日
969 阅读
6 点赞
2022-12-03
Flink 任务执行流程源码解析
用户提交Flink任务时,通过先后调用transform()——>doTransform()——>addOperator()方法,将map、flatMap、filter、process等算子添加到List<Transformation<?>> transformations集合中。在执行execute()方法时,会使用StreamGraphGenerator的generate()方法构建流拓扑StreamGraph(即Pipeline),数据结构属于有向无环图。在StreamGraph中,StreamNode用于记录算子信息,而StreamEdge则用于记录数据交换方式,包括以下几种Partitioner:{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}Partitioner类都是StreamPartitioner类的子类,它们通过实现isPointwise()方法来确定自身的类型。一种是ALL_TO_ALL,另一个种是POINTWISE。/** * A distribution pattern determines, which sub tasks of a producing task are connected to which * consuming sub tasks. * * <p>It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} are connected * in {@link EdgeManagerBuildUtil} */ public enum DistributionPattern { /** Each producing sub task is connected to each sub task of the consuming task. */ ALL_TO_ALL, /** Each producing sub task is connected to one or more subtask(s) of the consuming task. */ POINTWISE }ALL_TO_ALL意味着上游的每个subtask需要与下游的每个subtask建立连接。{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}POINTWISE则是上游的每个subtask和下游的一个或多个subtask连接。{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}StreamGraph构建完成后,,会通过 PipelineExecutorUtils.getJobGraph()构建JobGraph,具体流程是:——>PipelineExecutorUtils.getJobGraph() ——>FlinkPipelineTranslationUtil.getJobGraph() ——>StreamGraphTranslator.translateToJobGraph() ——>StreamGraph.getJobGraph() ——>StreamingJobGraphGenerator.createJobGraph()JobGraph是优化后的StreamGraph,如果相连的算子支持chaining,合并到一个StreamNode,chaining在StreamingJobGraphGenerator的setChaining()方法中实现:/** * Sets up task chains from the source {@link StreamNode} instances. * * <p>This will recursively create all {@link JobVertex} instances. */ private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) { // we separate out the sources that run as inputs to another operator (chained inputs) // from the sources that needs to run as the main (head) operator. final Map<Integer, OperatorChainInfo> chainEntryPoints = buildChainedInputsAndGetHeadInputs(hashes, legacyHashes); final Collection<OperatorChainInfo> initialEntryPoints = chainEntryPoints.entrySet().stream() .sorted(Comparator.comparing(Map.Entry::getKey)) .map(Map.Entry::getValue) .collect(Collectors.toList()); // iterate over a copy of the values, because this map gets concurrently modified for (OperatorChainInfo info : initialEntryPoints) { createChain( info.getStartNodeId(), 1, // operators start at position 1 because 0 is for chained source inputs info, chainEntryPoints); } }将符合chaining条件的,合并到一个StreamNode条件如下:1. 下游节点输入边只有一个 2. 与下游属于同一个SlotSharingGroup 3. 数据分发策略Forward 4. 流数据交换模式不是批量模式 5. 上下游并行度相等 6. StreamGraph中chaining为true streamGraph 是可以 chain的 7. 算子是否可以链化areOperatorsChainable代码如下:public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph); } private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) && (edge.getPartitioner() instanceof ForwardPartitioner) && edge.getExchangeMode() != StreamExchangeMode.BATCH && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled())) { return false; } ... ... 从Source节点开始,使用深度优先搜索(DFS)算法递归遍历有向无环图中的所有StreamNode节点。{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}待续 ... ...
2022年12月03日
1,316 阅读
31 点赞
2021-11-20
Flink实时计算问题记录与解决方案
当Flink任务的并行度大于Kafka分区数时,可能会导致部分并行度空闲,进而影响水位线(watermark)的生成。为了解决这个问题,可以通过设置withIdleness来进行调整:WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withIdleness(Duration.ofSeconds(60))对于withIdleness参数,应避免将下游任务设置得太小。原因在于,如果上游任务因故障停止,而其恢复所需时间超过了下游任务设置的withIdleness值,那么下游任务会将超时的分区标记为不再消费,导致数据丢失。为避免此问题,建议将分区数设置为不小于任务的并行度,并不设置withIdleness参数,这样可以有效防止潜在的数据丢失情况。{lamp/}kafkaSource指定时间戳消费时,必须为毫秒时间戳,Flink 1.14官网文档为秒,是错误的,指定后不会生效。setStartingOffsets(OffsetsInitializer.timestamp(1654703973000L)){lamp/}要实现Flink与Kafka的端到端一致性,需要确保Kafka的版本不低于2.5。要注意的是,Flink 1.14.2中flink-connector所包含的kafka-clients版本是2.4.X。because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.Flink-Kafka端到端一致性需要设置TRANSACTIONAL_ID_CONFIG = "transactional.id",如果不设置,从checkpoint重启会报错:OutOfOrderSequenceException: The broker received an out of order sequence number。{lamp/}Flink CDC同步mysql时,需要把binlog配置成ROW模式,查看命令和配置方法如下:show variables like 'binlog_format%'; vi /etc/my.cnf binlog_format=row systemctl restart mariadb.service非ROW模式时会报以下错误:Caused by: org.apache.flink.table.api.ValidationException: The MySQL server is configured with binlog_format MIXED rather than ROW, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_format=ROW and restart the connector.Flink 1.14.2版本使用CDC 2.2,需要编译CDC源码进行版本适配:1.pom文件中修改flink版本为1.14.2、scala版本为2.12.72.修改flink-table-planner-blink为flink-table-planner;flink-table-runtime-blink为flink-table-runtime3.flink-shaded-guava版本由30.1.1-jre-14.0修改为18.0-13.0修改完成后,会出现部分import报错的情况。需要根据新依赖版本中的路径和类进行相应的修改,例如,将创建TimestampFormat的代码修改为:TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions)。在编译过程中,首先需要使用install命令对父module进行安装。这样可以确保本地Maven仓库中包含各个子module的JAR包。对子module进行打包时,如Flink MySQL CDC,可以在子module的POM文件中修改打包方式,将所有依赖项都打包到一个JAR文件中,这样在工程中只需引入一个<dependency>即可。否则,会因为缺少某些依赖报错,如:Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
2021年11月20日
1,630 阅读
10 点赞