首页
Search
1
JAVA垃圾回收
993 阅读
2
Kafka、RocketMQ消息队列总结
981 阅读
3
Flink on Kubernetes 计算和存储分离落地实践
970 阅读
4
Linux免密登陆-ubuntu
904 阅读
5
Redis集群部署方案
890 阅读
大数据
Flink
后端
Java
笔记
运维
游客
Search
标签搜索
大数据
Flink
离线
实时
Redis
OpenJDK
Java
笔记
JVM
Elasticsearch
GC
Hadoop
Hudi
Flink CDC
K8S
数据湖
TOTC
累计撰写
307
篇文章
累计阅读
104.3万
次
首页
栏目
大数据
Flink
后端
Java
笔记
运维
页面
搜索到
7
篇与
大数据
的结果
返回首页
2018-08-11
Spark学习笔记
Spark Streaming KafkaUtils.createStream:PUSH,Kafka高级API,数据漏处理或者多处理状况,主题分区与RDD的分区不相关,可开启WAL(数据复制两次)。KafkaUtils.createDirectStream:PULL,Kafka低级API,Kafka和RDD分区之间有一对一的映射关系,不会更新Zookeeper中的偏移量。{lamp/}内存管理 spark on yarn (yarn-cluster)Resource Manager接收到申请后在集群中选择一个Node Manager分配Container,并在Container中启动ApplicationMaster进程,在ApplicationMaster中初始化SparkContext,生成一系列task,ApplicationMaster向Resource Manager申请资源后通知Node Manager在获得的Container中启动Excutor进程,SparkContext分配task给Excutor,Excutor发送运行状态给Driver。一个Container对应一个JVM进程,也就是一个executor,所以JVM的Heap Size取决于spark.executor.memory。堆内存90%以上作为安全空间,如果内存大,可以调高95%。缓存空间是60%(Heep 90%60%)(safetyFraction 和 memoryFraction) ,会负责存储 Persist、Unroll 以及 Broadcast 的数据。Unroll序列化空间是20%(Heep 90%60%*20%)。shuffle空间的安全比例是80%,spark.shuffle.memeoryFraction 0.2(Heep 80% 20%)。Spark1.6之后 联合内存 加入Heap 4G预留内存(Reserved Memory):系统预留内存,会用来存储Spark内部对象。默认是300M,Java Heap大小至少为*1.5=450M。用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。(Heap-300M)*25%=949M,每个 Executor 分配 1G 的数据就会OOM。Spark Memory, (Heap-300M)*75%,各50%,动态占用机制。Execution 内存:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据。Storage 内存:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据。Storage占用对方内存可能被淘汰,如果没有再磁盘存储会丢失(storage_level ),Execution占用对方内存只能等释放。先借用,再溢写到磁盘,内存优先。启用静态内存管理的方式是:spark.memory.useLegacyMode true{lamp/}RDD有多少分区就有多少task,因为一个task只能处理一个partition上的数据。Spark算子分为transform、action,只有action算子才触发计算(延迟计算)如:countbykey、reduce、count、take(n)、foreach、collect。一个action算子提交一个job、一个job包含一个或者多个stage、stage是根据RDD宽依赖(一个RDD分到两个不同的子RDD)、窄依赖(一个子RDD可以依赖多个父RDD)划分的。Excuotor(包含一个或者多个task,每个task一个虚拟core),Excuotor、task数量可以在submit中设置。总task数量一般设置成总core数的2-3倍,因为有的task可能先执行完。map一条记录变一条记录,function函数返回Object;faltmap一条记录变多条记录,function函数返回Iterable<Object>迭代器。join操作时,可以将reduce join转换成map join,并广播大变量。{lamp/}SHUFFLE 有hashshuffle、sortshuffle、钨丝shuffle,后者会进行排序,一个task一个文件,钨丝shuffle效果跟sort差不多, 使用了自己实现的一套内存管理机制,性能上有很大的提升。hashshuffle,map端写文件时每个task都会创建下一个stage总task数量的文件,可以设置合并,这样,每个excutor中的task就会公用一批文件,先往内存缓存写,再溢出到磁盘,调整大一些可以减少io次数,reduce端拉取文件时有buffer缓冲区,每次都只能拉取与buffer缓冲相同大小的数据。{lamp/}调优 Spark优化还包括,设置kyro序列化方式 ,性能更高、调整RDD持久化内存比例、调整shuffle时reduce端拉取数据重试次数,等待时长(因为在JVM full gc时是stop the world,多尝试几次)、数据本地化等待时长。
2018年08月11日
784 阅读
32 点赞
2018-07-07
数据同步工具DataX、Sqoop和Canal
DataX无法分布式部署,需要依赖调度系统实现多客户端,可以在传输过程中进行过滤,并且可以统计传输数据的信息,因此在业务场景复杂(表结构变更)更适用,同时对于不同的数据源支持更好,同时不支持自动创建表和分区。支持流量控制,支持运行信息收集,及时跟踪数据同步情况。
2018年07月07日
808 阅读
80 点赞
1
2