首页
Search
1
JAVA垃圾回收
993 阅读
2
Kafka、RocketMQ消息队列总结
981 阅读
3
Flink on Kubernetes 计算和存储分离落地实践
970 阅读
4
Linux免密登陆-ubuntu
904 阅读
5
Redis集群部署方案
890 阅读
大数据
Flink
后端
Java
笔记
运维
游客
Search
标签搜索
大数据
Flink
离线
实时
Redis
OpenJDK
Java
笔记
JVM
Elasticsearch
GC
Hadoop
Hudi
Flink CDC
K8S
数据湖
TOTC
累计撰写
307
篇文章
累计阅读
104.3万
次
首页
栏目
大数据
Flink
后端
Java
笔记
运维
页面
搜索到
27
篇与
TOTC
的结果
返回首页
2022-12-03
Flink 任务执行流程源码解析
用户提交Flink任务时,通过先后调用transform()——>doTransform()——>addOperator()方法,将map、flatMap、filter、process等算子添加到List<Transformation<?>> transformations集合中。在执行execute()方法时,会使用StreamGraphGenerator的generate()方法构建流拓扑StreamGraph(即Pipeline),数据结构属于有向无环图。在StreamGraph中,StreamNode用于记录算子信息,而StreamEdge则用于记录数据交换方式,包括以下几种Partitioner:{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}Partitioner类都是StreamPartitioner类的子类,它们通过实现isPointwise()方法来确定自身的类型。一种是ALL_TO_ALL,另一个种是POINTWISE。/** * A distribution pattern determines, which sub tasks of a producing task are connected to which * consuming sub tasks. * * <p>It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} are connected * in {@link EdgeManagerBuildUtil} */ public enum DistributionPattern { /** Each producing sub task is connected to each sub task of the consuming task. */ ALL_TO_ALL, /** Each producing sub task is connected to one or more subtask(s) of the consuming task. */ POINTWISE }ALL_TO_ALL意味着上游的每个subtask需要与下游的每个subtask建立连接。{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}POINTWISE则是上游的每个subtask和下游的一个或多个subtask连接。{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}StreamGraph构建完成后,,会通过 PipelineExecutorUtils.getJobGraph()构建JobGraph,具体流程是:——>PipelineExecutorUtils.getJobGraph() ——>FlinkPipelineTranslationUtil.getJobGraph() ——>StreamGraphTranslator.translateToJobGraph() ——>StreamGraph.getJobGraph() ——>StreamingJobGraphGenerator.createJobGraph()JobGraph是优化后的StreamGraph,如果相连的算子支持chaining,合并到一个StreamNode,chaining在StreamingJobGraphGenerator的setChaining()方法中实现:/** * Sets up task chains from the source {@link StreamNode} instances. * * <p>This will recursively create all {@link JobVertex} instances. */ private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) { // we separate out the sources that run as inputs to another operator (chained inputs) // from the sources that needs to run as the main (head) operator. final Map<Integer, OperatorChainInfo> chainEntryPoints = buildChainedInputsAndGetHeadInputs(hashes, legacyHashes); final Collection<OperatorChainInfo> initialEntryPoints = chainEntryPoints.entrySet().stream() .sorted(Comparator.comparing(Map.Entry::getKey)) .map(Map.Entry::getValue) .collect(Collectors.toList()); // iterate over a copy of the values, because this map gets concurrently modified for (OperatorChainInfo info : initialEntryPoints) { createChain( info.getStartNodeId(), 1, // operators start at position 1 because 0 is for chained source inputs info, chainEntryPoints); } }将符合chaining条件的,合并到一个StreamNode条件如下:1. 下游节点输入边只有一个 2. 与下游属于同一个SlotSharingGroup 3. 数据分发策略Forward 4. 流数据交换模式不是批量模式 5. 上下游并行度相等 6. StreamGraph中chaining为true streamGraph 是可以 chain的 7. 算子是否可以链化areOperatorsChainable代码如下:public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph); } private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) && (edge.getPartitioner() instanceof ForwardPartitioner) && edge.getExchangeMode() != StreamExchangeMode.BATCH && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled())) { return false; } ... ... 从Source节点开始,使用深度优先搜索(DFS)算法递归遍历有向无环图中的所有StreamNode节点。{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}{dotted startColor="#b3b2b2" endColor="#b3b2b2"/}待续 ... ...
2022年12月03日
1,316 阅读
31 点赞
2021-11-20
Flink实时计算问题记录与解决方案
当Flink任务的并行度大于Kafka分区数时,可能会导致部分并行度空闲,进而影响水位线(watermark)的生成。为了解决这个问题,可以通过设置withIdleness来进行调整:WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withIdleness(Duration.ofSeconds(60))对于withIdleness参数,应避免将下游任务设置得太小。原因在于,如果上游任务因故障停止,而其恢复所需时间超过了下游任务设置的withIdleness值,那么下游任务会将超时的分区标记为不再消费,导致数据丢失。为避免此问题,建议将分区数设置为不小于任务的并行度,并不设置withIdleness参数,这样可以有效防止潜在的数据丢失情况。{lamp/}kafkaSource指定时间戳消费时,必须为毫秒时间戳,Flink 1.14官网文档为秒,是错误的,指定后不会生效。setStartingOffsets(OffsetsInitializer.timestamp(1654703973000L)){lamp/}要实现Flink与Kafka的端到端一致性,需要确保Kafka的版本不低于2.5。要注意的是,Flink 1.14.2中flink-connector所包含的kafka-clients版本是2.4.X。because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.Flink-Kafka端到端一致性需要设置TRANSACTIONAL_ID_CONFIG = "transactional.id",如果不设置,从checkpoint重启会报错:OutOfOrderSequenceException: The broker received an out of order sequence number。{lamp/}Flink CDC同步mysql时,需要把binlog配置成ROW模式,查看命令和配置方法如下:show variables like 'binlog_format%'; vi /etc/my.cnf binlog_format=row systemctl restart mariadb.service非ROW模式时会报以下错误:Caused by: org.apache.flink.table.api.ValidationException: The MySQL server is configured with binlog_format MIXED rather than ROW, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_format=ROW and restart the connector.Flink 1.14.2版本使用CDC 2.2,需要编译CDC源码进行版本适配:1.pom文件中修改flink版本为1.14.2、scala版本为2.12.72.修改flink-table-planner-blink为flink-table-planner;flink-table-runtime-blink为flink-table-runtime3.flink-shaded-guava版本由30.1.1-jre-14.0修改为18.0-13.0修改完成后,会出现部分import报错的情况。需要根据新依赖版本中的路径和类进行相应的修改,例如,将创建TimestampFormat的代码修改为:TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions)。在编译过程中,首先需要使用install命令对父module进行安装。这样可以确保本地Maven仓库中包含各个子module的JAR包。对子module进行打包时,如Flink MySQL CDC,可以在子module的POM文件中修改打包方式,将所有依赖项都打包到一个JAR文件中,这样在工程中只需引入一个<dependency>即可。否则,会因为缺少某些依赖报错,如:Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
2021年11月20日
1,630 阅读
10 点赞
2020-09-19
CPU使用率突增问题排查记录-木马攻击
1.使用top或者ps -aux | sort -k4nr | head -n 10命令查看占用cpu较高的进程2.使用systemctl status 19084查看Main PID,可以看到是定时任务3.使用cd /proc/681ls -ail查看所在目录,杀掉进程、删除相关文件夹
2020年09月19日
1,250 阅读
77 点赞
2020-01-11
JVM垃圾收集行为分析方法
尽管某些监控工具,如:jvisualvm,可以实时提供垃圾收集图表和指标,但它们并没有提供GC行为完整的详细信息,GC日志是研究垃圾收集行为的最佳信息来源。启用GC日志 可以通过指定以下JVM参数来启用GC日志:Java 8及以下版本-XX:+PrintGCDetails -Xloggc:<gc-log-file-path> Example: -XX:+PrintGCDetails -Xloggc:/opt/tmp/myapp-gc.logJava 9及以上版本-Xlog:gc*:file=<gc-log-file-path> Example: -Xlog:gc*:file=/opt/tmp/myapp-gc.log注意事项 一般需要观察24小时的GC日志,这样就会同时看到高流量和低流量的情况。 建议从生产环境中收集GC日志,因为垃圾收集行为受流量模式的影响很大,在测试环境中很难模拟生产流量。我们进行2次测试:基线测试——使用JMeter工具在没有启用垃圾收集GC日志的情况下运行应用程序20分钟,同时有200个并发用户。GC日志启用测试——使用相同的JMeter脚本运行应用程序并启用垃圾收集GC日志,持续时间为20分钟,同时有200个并发用户。对比结果如下:如图所示,CPU和内存消耗没有明显差异,同样,平均响应和事务吞吐量也没有明显差异,通过实验可以看出,GC日志在生产服务器中增加的开销可以忽略不计。分析工具 捕获GC日志后,可以使用以下免费工具之一来分析GC日志:1.GCeasy 2.IBM GC & Memory visualizer 3.HP Jmeter4.Garbage Cat
2020年01月11日
665 阅读
13 点赞
2019-04-20
Redis集群部署方案
系统包安装 配置操作系统yum 源安装以下系统包安装gcc:yum install gcc安装zlib:yum install zib安装ruby:yum install ruby 2.0以上安装rubygems:yum install rubygemsRedis 安装 在redis 官网https://redis.io/download下载 redis-3.2.9.tar.gz拷贝redis-3.2.9.tar.gz 到/application/search解压 tar –zxvf redis-3.2.9.tar.gz安装 cd src && make && make test && make install修改配置 进入cd /application/search/ redis-3.2.9复制 cp redis.conf redis6400.conf修改 redis6400.conf 里面的参数port 6400 --端口maxmemory 2g –内存大小cluster-enabled yes –开启集群模式dir /data0/redis 数据文件存放位置详细配置请见 服务器上配置启动停止 以 172.16.0.9 6400为例:启动:redis-server redis6400.conf停止: redis-cli –c –h 172.16.0.9 –p 6400 shutdown构建集群 在三台172.16.0.9,172.16.0.8 172.16.0.6 安装完成redis以后构建三主三从的高可用redis集群在任一台机器执行命令:/application/search/redis-3.2.9/src/redis-trib.rb create --replicas 2 172.16.0.9:6400 172.16.0.9:6401 172.16.0.9:6402 172.16.0.6:6500 172.16.0.6:6501 172.16.0.6:6502 172.16.0.8:6600 172.16.0.8:6601 172.16.0.8:6602
2019年04月20日
890 阅读
38 点赞
1
2
3
...
6