首页
Search
1
JAVA垃圾回收
993 阅读
2
Kafka、RocketMQ消息队列总结
981 阅读
3
Flink on Kubernetes 计算和存储分离落地实践
970 阅读
4
Linux免密登陆-ubuntu
904 阅读
5
Redis集群部署方案
890 阅读
大数据
Flink
后端
Java
笔记
运维
游客
Search
标签搜索
大数据
Flink
离线
实时
Redis
OpenJDK
Java
笔记
JVM
Elasticsearch
GC
Hadoop
Hudi
Flink CDC
K8S
数据湖
TOTC
累计撰写
307
篇文章
累计阅读
104.3万
次
首页
栏目
大数据
Flink
后端
Java
笔记
运维
页面
搜索到
27
篇与
TOTC
的结果
返回首页
2024-12-13
构建智能灵活的 Ribbon 负载均衡策略
分布式系统架构中,负载均衡是确保系统高可用性和性能的关键环节。以线上服务为例,某个服务原本部署了4个实例来应对大量的请求流量。然而,意外情况发生,其中一个实例所在的机房出现故障,导致其响应速度变得极为缓慢,但是仍然和Nacos注册中心保持着心跳。而当时所采用的负载均衡策略是轮询策略 RoundRobinRule,这一策略在正常情况下能够较为均匀地分配请求,但在面对这种异常情况时,却暴露出了明显的局限性。由于轮询策略的特性,它不会根据实例的实际响应情况进行动态调整,这就使得故障实例上仍然会有大量请求持续堆积。随着时间的推移,发现该实例所在机器的 close_wait 连接数急剧增加,导致整个机器负载加重。为了解决这一问题,调研了一些传统的应对策略:其一,配置超时失败重试机制 ... httpclient: response-timeout: 30s。故障实例响应慢时,自动失败路由到其他实例进行重试,从而使上游的请求最终能够成功。但故障服务实例的流量并没有得到有效的控制和调整。这意味着故障实例和所在机器仍然在承受着巨大的压力。其二,采用熔断策略 Sentinel、Resilience4J、Hystrix。在响应时间/出错百分比/线程数等达到阈值时进行降级、熔断,以保护其他服务实例不受影响。然而,在该场景中,由于还有 3/4 的实例处于正常可用状态,直接进行熔断操作显得过于激进。其三,考虑使用权重轮询策略 WeightedResponseTimeRule。根据服务实例的性能表现动态地分配权重,性能好的实例会被分配更多的请求,而性能差的实例则会逐渐减少请求分配。但该场景下,故障机器的响应时间与正常服务相比已经不在一个数量级,其 QPS 却依然很高。这就导致在权重轮询策略下,故障机器的服务权重会迅速降低,几乎不再接收请求。而且由于我们的配置是在网关层面,当故障机器恢复后,系统无法自动重新计算权重,使得分配到故障机器的流量很少,其权重也很难再次提升上去。基于以上困境,决定对权重轮询策略进行二次开发,使其更加智能,以最大限度地减小请求端的影响。首先增加过滤器RibbonResponseFilter。这个过滤器的主要作用是计算每个服务实例的响应时间,并将其记录到 ServerStats 中。同时,它还会记录请求的返回状态,如果返回状态不是 200,就将其转化为请求超时,并相应地减小该服务的权重。@Component @Slf4j public class RibbonResponseFilter implements GlobalFilter, Ordered { @Autowired protected final SpringClientFactory springClientFactory; public static final String RQUEST_START_TIME = "RequestStartTime"; public static final double TIME_WEIGHT = 30000; public RibbonResponseFilter(SpringClientFactory springClientFactory) { this.springClientFactory = springClientFactory; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { exchange.getAttributes().put(RQUEST_START_TIME, System.currentTimeMillis()); return chain.filter(exchange).then(Mono.fromRunnable(() -> { URI requestUrl = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); LoadBalancerContext loadBalancerContext = this.springClientFactory.getLoadBalancerContext(route.getUri().getHost()); ServerStats stats = loadBalancerContext.getServerStats(new Server(requestUrl.getHost(), requestUrl.getPort())); long orgStartTime = exchange.getAttribute(RQUEST_START_TIME); long time = System.currentTimeMillis() - orgStartTime; // 响应时间超过 5s 或者服务异常时,减小权重 if (exchange.getResponse().getStatusCode().value()!= 200 || time > 5000) { log.info("The abnormal response will lead to a decrease in weight : {} ", requestUrl.getHost()); stats.noteResponseTime(TIME_WEIGHT); } })); } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } }增加这个过滤器的原因在于,无论是使用自定义的负载均衡策略,还是内置的 WeightedResponseTimeRule,都无法自动获取到每个服务实例的总请求次数、异常请求次数以及响应时间等关键参数。通过这个过滤器,能够有效地收集这些信息,为后续的权重计算和调整提供有力的数据支持。在注册权重更新 Timer(默认 30s)的同时,同时注册了一个权重重置 Timer(5m)。这样一来,当故障服务实例恢复后,在 5 分钟内,它就能够重新参与到负载均衡的分配中。以下是相关的代码片段:void resetWeight() { if (resetWeightTimer!= null) { resetWeightTimer.cancel(); } resetWeightTimer = new Timer("NFLoadBalancer-AutoRobinRule-resetWeightTimer-" + name, true); resetWeightTimer.schedule(new ResetServerWeightTask(), 0, 60 * 1000 * 5); ResetServerWeight rsw = new ResetServerWeight(); rsw.maintainWeights(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { logger.info("Stopping NFLoadBalancer-AutoRobinRule-ResetWeightTimer-" + name); resetWeightTimer.cancel(); } })); } public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!resetServerWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Reset weight job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { return; } Double weightSoFar = 0.0; List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error reset server weights", e); } finally { resetServerWeightAssignmentInProgress.set(false); } }在采用此负载均衡策略时,若重置权重后服务仍未修复,由于配置了超时重试机制,请求端可毫无察觉。与此同时,该服务实例的权重会迅速在短时间内再次降至极低水平,如此循环,直至实例恢复正常。此策略有效地处理了线上服务可能遭遇的各类异常状况。
2024年12月13日
77 阅读
1 点赞
2023-12-30
Flink + Hudi 流批一体作业稳定性优化
目前,任务分为两种类型:当业务逻辑较为简单时,使用 Flink SQL 进行处理,例如将原始日志或业务库同步至 Hudi 的 ODS 层、进行多表关联和聚合等操作;当业务逻辑比较复杂或需要特殊处理时,例如部分数据需要通过 API 获取,则使用 Flink DataStream API 消费 Hudi 数据,并经过一系列处理后,写入 Hudi 的 DWD、DWS 或 ADS 层。针对内存方面的问题,对Flink写入Hudi任务多个Task的Heap使用率进行了监控。注意到使用率存在较大的波动,有时甚至可能超出分配的阈值,这种情况可能导致任务被强制终止。Yarn Application报错信息: org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. Association with remote system [akka.tcp://flink] has failed, address is now Association with remote system [akka.tcp://flink] has failed,address is now gated for [50] ms NodeManager Local logs报错信息: org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=13427,containerID=container_e632_1670877056012_20807318_01_000001] is running beyond physical memory limits. Current usage: 2.0 GB of 2 GB physical memory used; 4.5 GB of 4.2 GB virtual memory used. Killing container.可以通过在yarn-site.xml中将yarn.nodemanager.vmem-check-enabled设为false,或者为jobmanager和taskmanager分配足够的内存来提高任务的稳定性。<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> 注:是否启用一个线程检查每个任务正在使用的虚拟内存量,如果任务超出了分配值,则直接将其kill,默认是true另一个问题是,Managed Memory的使用率较低,而Task Heap的使用率却相当高。在增加TaskManager的内存分配(通过-ytm参数)时,会根据taskmanager.memory.managed.fraction的设置将新增的内存分配给Managed Memory。这可能会造成资源浪费。适当减小该配置的值有助于提高整体内存利用率。taskmanager.memory.managed.fraction: 0.2使用Hudi写入可能导致节点过载,从而导致上游算子背压。在执行SQL时,可以考虑提高写入并行度。如果是DateStream API Hudi到Hudi的数据传输,由于数据分布可能不均,建议在读取后直接使用rebalance()算子进行数据均衡处理。对于当前分区文件数量问题,写入Hudi任务的并行度会直接影响文件数量,随着并行度的增加,文件数量也会相应增加。此外,对于COPY_ON_WRITE模式,以下三个参数会直接影响文件数量。任务在运行一段时间后,当合并操作和触发操作达到平衡时,文件数量会在很小范围内波动。此外,可以通过减小clustering.delta_commits来减少当前分区文件数量。clustering.delta_commits 触发合并文件所需的提交次数 clean.retain_commits 保留的提交数,不进行清理 cleaner.policy(KEEP_LATEST_COMMITS)相对于在线clustering而言,离线clustering更加稳定并且可以实现资源隔离。经过大量任务长时间运行后来看,在使用离线clustering时需要注意以下问题。常用离线clustering命令如下:nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/${bundle_jar_name}.jar \ --service \ --path ${table_path} > clustering.log & \以上离线clustering运行时,--plan-partition-filter-mode默认为NONE,所有符合条件的分区都会加入到任务当中,根据使用情况来看,随着历史分区的增加,资源使用逐渐处于较高的水平,该模式比较适用于临时处理大量滞留文件。同时,当数据量较大时还存在一个问题,当Clustering异常时,重启任务,偶尔会报错提示历史分区某parquet不存在(社区群有同学反馈过该问题)。通过配置参数--plan-partition-filter-mode RECENT_DAYS --target-partitions 2可以解决该问题。还可以使用--plan-partition-filter-mode的SELECTED_PARTITIONS模式,划定历史分区的范围,处理积压问题。Flink离线Clustering使用建议,分区过滤模式RECENT_DAYS和NONE的组合使用相对较为稳定,适用于线上环境。当历史分区中存在大量待Clustering的文件时,可以临时使用NONE模式压缩几次。等到压缩速度跟得上时,再切换回RECENT_DAYS模式进行进一步的处理,最终的离线Clustering命令:常规: nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --clustering-delta-commits 8 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --plan-partition-filter-mode RECENT_DAYS \ --target-partitions 2 \ --path hdfs://...... > clustering.log & 处理积压: flink run -c \ org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --min-clustering-interval-seconds 3 \ --max-num-groups 1000 \ --clustering-delta-commits 1 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --path ${target_path}在MERGE_ON_READ模式下,当前分区的文件数量与compaction.delta_commits、clean.retain_commits有关。但是当clean.retain_commits大于10时,可能会出现parquet和log文件无法被清理干净的情况,导致部分小文件滞留在历史分区中。HoodieFlinkClusteringJob类可以接受多个参数,可以在源码org.apache.hudi.sink.clustering.FlinkClusteringConfig中或使用如下命令查看:flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ ../flink-1.14.2/lib/hudi-flink1.14-bundle-0.12.3.jar \ -h需要注意的是Boolean类型的参数,默认都为false,如果要开启,直接使用--param_name即可,而不是--param_name true,Hudi中解析参数使用的JCommander,如下图所示,如果Boolean参数后面跟true、false、0、1,都会在遍历参数时错位,导致任务提交失败,报错提示和这种配置方式并不是很友好,很容易让使用者误认为该参数是不可配置的!通过适当增加以下两个超时参数,可以有效避免因网络波动等原因导致的超时问题,从而降低任务失败的概率。set execution.checkpointing.timeout=600000 properties.request.timeout.ms=120000(connector为kafka时)以下参数设置为true时,如果TaskManager发生了Akka错误,例如无法与JobManager通信或其他网络问题,TaskManager将会立即退出;设置为false时,TaskManager将不会主动退出,而是继续运行,这可能会导致系统处于不稳定状态,并且需要人工干预来解决问题。taskmanager.exit-on-fatal-akka-error: true改动Hudi源码或者适配低版本大数据组件,参考了如下文章,可顺利完成编译打包:https://blog.csdn.net/weixin_45417821/article/details/127407461mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 -Dflink1.14 -Dscala-2.12 mvn clean install -DskipTests -Dscala-2.12 -Pflink-bundle-shade-hive3
2023年12月30日
1,946 阅读
15 点赞
2023-07-01
Flink流批一体作业管理平台
前言随着大数据处理需求的不断增长,流批一体作业管理平台的重要性愈发凸显。本文将介绍如何针对 Flink + Hudi 流批一体任务进行管理,特别针对 Hudi 任务的需求,支持一键启动离线 Compaction 和 Clustering 任务,保证数据湖的稳定运行。背景在大数据处理领域,简化作业的提交、监控和管理是一项重要的任务。调研过程中发现,现有的开源系统如 Dinky 和 StreamPark 在任务生成和监控方面存在一定局限性。它们使用了 Flink 或 YARN 源码中的 API,如 org.apache.hadoop.yarn.client.api.YarnClient 、 org.apache.hadoop.hdfs.DistributedFileSystem 等,导致系统与 Hadoop 集群和 Flink 客户端版本之间耦合度高,Hadoop集群使用权限要求较高。系统开发基于项目 flink-streaming-platform-web 进行开发,通过优化和功能扩展(如:对使用FlinkSQL操作Hudi表的支持),构建了一个无侵入性、与 Hadoop 集群和 Flink 客户端版本解耦的流批一体作业调度平台。只需在装有 Flink 客户端的机器上启动系统,即可轻松使用,所有任务均使用 Flink 自带的命令进行提交,如:# 提交 Compaction 任务 flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor ... # 提交 Clustering 任务 flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob ...核心功能流任务管理支持 FlinkSQL 和 JAR 提交任务,提供流式作业的提交、监控、告警和日志查看等功能,为用户提供全方位的作业管理服务。批任务调度支持 FlinkSQL 和 JAR 提交任务,可定时调度批处理作业,定时完成数据处理任务,提高数据处理效率。数据湖管理(Hudi)特别针对 Hudi 任务的需求,系统支持对 MOR(Merge On Read)和 COW(Copy On Write)模式下的离线 Compaction 和离线 Clustering 任务进行管理,保证数据湖的稳定运行和数据质量。架构设计该平台的架构设计是其高效运行的关键,采用了模块化设计,将流任务管理、批任务调度和数据湖管理等功能模块化,实现了高度的灵活性和可扩展性。每个模块都具有清晰的职责和接口,使得系统易于维护和扩展。流任务管理模块负责接收用户提交的流式作业,并将其转换为 Flink 任务进行执行。这个模块需要实现任务的监控和告警功能,以确保作业的稳定运行。采用了分布式监控系统来实现实时监控和告警,保障了系统的高可用性和可靠性。批任务调度模块则负责定时调度批处理作业,并在预定的时间点执行数据处理任务。这个模块需要考虑到作业的依赖关系和执行顺序,以确保数据处理任务按时完成。采用了依赖调度策略来解决这个问题,有效地提高了作业的执行效率。数据湖管理模块是该平台的重要组成部分,特别针对 Hudi 任务的需求进行了优化。实现了对 MOR 和 COW 模式下的离线 Compaction 和离线 Clustering 任务的管理,确保了数据湖的稳定运行和数据质量。该平台极大地提升了流批一体作业创建的效率和灵活性,提供了更便捷、可靠的作业管理解决方案。未来,将持续优化系统功能,以满足需求,并助力流批一体作业的稳定运行和管理。
2023年07月01日
1,359 阅读
91 点赞
2023-03-25
Flink on Kubernetes 计算和存储分离落地实践
云原生已成为业界的主要趋势之一。将Flink从Yarn迁移到Kubernetes平台带来了许多优势。在这种架构下,将计算和存储解耦,计算部分运行在Kubernetes上,而存储则使用HDFS等分布式存储系统。这样的架构优势在于可以根据实际情况独立调整计算和存储资源,从而提高整体的效率和弹性。本文将介绍四种Flink在Kubernetes上的部署模式。其中,两种是基于Native Kubernetes部署的,分别有Session模式和Application模式。另外两种是基于Flink Kubernetes Operator部署的,同样包括Session模式和Application模式。首先介绍基于Flink Kubernetes Operator部署的Application模式。如果要运行自己编写的jar包,需要先构建一个镜像。如果使用了HDFS、Hudi等其他组件,还需要在Dockerfile中将Hadoop客户端和配置文件复制到镜像中,并设置相应的环境变量。同时,将所有依赖的jar包复制到Flink Home的lib目录下。FROM flink:1.16.1-scala_2.12 USER root RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone ADD hadoop-3.1.1.tar.gz /opt ADD jdk1.8.0_121.tar.gz /opt RUN rm /opt/hadoop-3.1.1/share/hadoop/common/lib/commons-math3-3.1.1.jar RUN rm /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/commons-math3-3.1.1.jar COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/common/lib/ COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/ RUN chmod -R 777 /opt/hadoop-3.1.1/share/hadoop/common/lib/ RUN mkdir -p /opt/hadoop/conf/ COPY yarn-site.xml /opt/hadoop/conf/ COPY core-site.xml /opt/hadoop/conf/ COPY hdfs-site.xml /opt/hadoop/conf/ COPY flink-shaded-hadoop-3-uber-3.1.1.7.0.3.0-79-7.0.jar $FLINK_HOME/lib/ COPY commons-cli-1.5.0.jar $FLINK_HOME/lib/ RUN mkdir $FLINK_HOME/mylib COPY xxx-1.0-SNAPSHOT.jar $FLINK_HOME/mylib RUN chown -R flink:flink $FLINK_HOME/mylib RUN echo 'export JAVA_HOME=/opt/jdk1.8.0_121 \n\ export HADOOP_HOME=/opt/hadoop-3.1.1 \n\ PATH=$PATH:$JAVA_HOME/bin \n\ PATH=$PATH:$HADOOP_HOME/bin'\ >> ~/.bashrc EXPOSE 8081构建镜像,在Dockerfile所在的目录中执行以下命令,确保该目录包含用于构建镜像的文件。docker build -t flink-native/flink-on-k8s-xxxx .安装helmcurl https://baltocdn.com/helm/signing.asc | sudo apt-key add -sudo apt-get install apt-transport-https --yes echo "deb https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list sudo apt-get update sudo apt-get install helm安装cert-manager组件,由它提供证书服务。kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml安装Flink Kubernetes Operatorhelm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace flink --create-namespace执行上述命令后,将会从ghcr.io/apache/flink-kubernetes-operator:7fc23a1镜像仓库拉取镜像。由于下载速度较慢,可以尝试从apache/flink-kubernetes-operator:7fc23a1仓库拉取镜像,然后为其添加标签docker tag apache/flink-kubernetes-operator:7fc23a1 ghcr.io/apache/flink-kubernetes-operator:7fc23a1如果在重新安装时遇到使用kubectl delete无法删除的情况,可以尝试以下命令来实现删除操作:kubectl patch crd/flinksessionjobs.flink.apache.org -p '{"metadata":{"finalizers":[]}}' --type=merge通过执行上述命令,可以成功删除该资源。查看自定义资源 kubectl get customresourcedefinition构建一个YAML文件来提交任务。其中,image指定了镜像,jarURI指定了jar包在镜像中的位置,entryClass指定了要执行的类,args指定了该类所需的参数:kind: FlinkDeployment metadata: name: flink-application-xxx spec: image: flink-native/flink-on-k8s-xxxx flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "n" serviceAccount: flink jobManager: resource: memory: "nm" cpu: n taskManager: resource: memory: "nm" cpu: n job: jarURI: local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar entryClass: com.xxx.run.XXXJob parallelism: n upgradeMode: stateless args: ["hdfs://host:9000/data/input","hdfs://host:9000/data/output","n"]提交任务kubectl create -f xxx.yaml查看flinkdeployment kubectl get flinkdeployment查看日志kubectl logs -f deploy/flink-application-xxx{lamp/}Flink on K8S Session模式和Application模式需要安装Flink客户端,下载flink压缩包,解压即可设置命名空间首选项、赋权等kubectl create ns flink-native kubectl config set-context --current --namespace=flink-native kubectl create serviceaccount flink kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=cluster-admin --serviceaccount=flink-native:flink --namespace=flink-nativeSession模式,启动Flink集群bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=xxx\ -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \ -Dkubernetes.namespace=flink-native\ -Dkubernetes.service-account=flink \ -Dclassloader.check-leaked-classloader=false \ -Dkubernetes.rest-service.exposed.type=ClusterIP \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dresourcemanager.taskmanager-timeout=60000 \ -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"端口转发nohup kubectl -n flink-native port-forward --address 0.0.0.0 service/my-first-flink-cluster-rest 8081:8081 >port-forward.log &打开Flink Web UI,可以看到此时只有jobmanager向集群提交任务,运行测试任务bin/flink run -e kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=my-first-flink-cluster examples/streaming/TopSpeedWindowing.jar运行自己的jar包bin/flink run -e kubernetes-session \ -Dkubernetes.namespace=flink-native \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dkubernetes.cluster-id=xxx \ -c com.xxx.run.XXXJob \ mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2查看pod,此时可以看到生成了taskmanagerkubectl get pod -o wide -A查看日志,使用以下命令可以看到测试程序TopSpeedWindowing的输出结果kubectl logs my-first-flink-cluster-taskmanager-1-1 -n flink-native查看任务列表:bin/flink list --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxx 根据ID删除任务:bin/flink cancel --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxxxr 3ff3c5a5e3c2f47e024e2771dc108f77Application模式bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=xxx\ -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \ -Dkubernetes.namespace=flink-native\ -Dkubernetes.service-account=flink \ -Dclassloader.check-leaked-classloader=false \ -Dkubernetes.rest-service.exposed.type=ClusterIP \ -c com.sohu.longuserprofile.run.TestJob \ local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2Session模式只能提交本地(宿主机)jar包,Application模式只能使用local:///{lamp/}常用命令k8s web ui 登录token获取 kubectl -n kubernetes-dashboard describe secret $(kubectl -n kubernetes-dashboard get secret | grep dashboard-admin | awk '{print $1}') | grep token: 查看所有pod列表 kubectl get pod -o wide -A 查看pod详细信息 kubectl describe pod pod_name -n flink-native 删除deployment kubectl delete deployment/my-first-flink-cluster 进入pod kubectl exec -it -n flink-native pod_name /bin/bash 获得所有命名空间 kubectl get namespace 拷贝出来 kubectl cp -n application psqls-0:/var/lib/postgresql/data/pg_wal /home 拷贝进去 kubectl cp /home/dades/pg_wal -n application psqls-0:/var/lib/postgresql/data/pg_wal
2023年03月25日
970 阅读
6 点赞
2023-02-04
调度系统核心服务迁移方案演进
系统使用Zookeeper进行主从服务选举,对于频繁更新数据库的服务S1和S2,迁移操作需要确保高可用性并且不容忍数据丢失。首先,对数据库进行扩展,引入DB3。要确保DB3的数据与主数据库DB1和从数据库DB2完全一致,需要执行库锁操作。由于服务对数据库的操作不支持等待或失败重试,所以无法对主数据库进行锁定。因此,考虑了以下方案:1.对DB2进行锁库,此时DB2是静态的flush tables with read lock;2.将DB2的历史数据导入DB3主:bin/mysqldump -urecom recom -p >recom.sql 从:bin/mysql -u recom -p recom < recom.sql3.将DB3的Master指向DB2show master status; change master to master_host='10.*.*.*',master_user='un',master_password='pwd123',master_log_file='/opt/mysql_data/binlog.000068',master_log_pos=117844696; start slave; show slave status \G;4.然后解锁DB2,此时延迟同步的数据会同步到DB2,DB2再同步到DB3,从而保证了三个数据库完全一致unlock tables;使用同样的方式,可以扩展DB4,形成链式复制。在将服务配置切换到新的数据库时,主库的切换可能会带来一些问题。由于每个服务都有独立的配置,无法同时对所有服务进行配置更新。后续如果引入NACOS,通过配置热更新可以使迁移变得更加简单。因此,在修改某个服务的主库之前,需要考虑以下情况:假设S1和S2的配置分别为DB1和DB2。现在要切换S2的配置。当S2的主库发生改变后,在切换其他服务的主库之前,可能会出现同时向DB1和DB2/DB3进行更新的情况。这意味着DB1更新的数据会同步到DB2和DB3,但是DB2和DB3更新的数据无法同步到DB1。如果一个请求到达S2并在DB2/DB3中插入了一条数据,接下来的请求到达S1时,当尝试更新上一个请求插入的数据时,就会发生异常。因此,链式复制可以满足数据库的扩展需求,但无法满足服务的扩展需求。在迁移过程中,存在一个关键时间点,其中两个服务可能同时对不同的数据库进行更新操作。然而,一旦更新了主从复制链中间的库,上游数据库将无法同步到最新的数据。如果希望在迁移过程中保持服务的高可用性,必须解决这个问题。需要确保无论哪个服务更新了其中一个数据库,其他数据库都能感知到这个变化。因此,考虑将链式的主从MySQL连接成一个环形结构。在实现这一点时,需要考虑以下两个问题:{card-describe title="问题"}1.在更新数据时,担心在环路上可能会导致死循环的问题。通过进行调研,发现MySQL提供了良好的支持,不会出现这种问题。2.另一个需要考虑的问题是自增主键的冲突。假设在DB1插入了一条数据,在这条数据同步之前,又在DB2插入了一条数据。这样,这两条不同数据的自增主键可能会相同。当DB1和DB2之间通过环路复制传递新增数据时,就会发生主键冲突的情况。{/card-describe}针对上述问题,需要找到解决方案来确保数据同步和主键唯一性。为了解决这个问题,可以通过设置auto_increment_offset和auto_increment_increment来控制不同库之间的自增偏移量和步长。这样,就能够在任何时间点将服务指向环形结构中的任意两个库,从而完成迁移过程。MySQL相关命令:启动和初始化 bin/mysqld --defaults-file=/opt/mysql/my.cnf --initialize --user=root --basedir=/opt/mysql --datadir=/opt/mysql_data bin/mysqld_safe --defaults-file=/opt/mysql/my.cnf --user=root & 获取root密码 cat errorlog.err | grep root@localhost 重置密码和授权 bin/mysql -u root -p set password for 'root'@'localhost' = password(''); CREATE USER 'recom'@'%' IDENTIFIED BY 'recom@123'; GRANT ALL PRIVILEGES ON *.* TO 'recom'@'%' IDENTIFIED BY 'recom@123' WITH GRANT OPTION; flush privileges; 其他命令 stop slave; reset slave all; bin/mysqladmin -uroot -p shutdown
2023年02月04日
446 阅读
2 点赞
1
2
...
6