性能文章>简单的聊一聊Spark的性能优化>

简单的聊一聊Spark的性能优化原创

180100

Spark的性能优化

参数级的优化

spark_driver_memory=4g
spark_num_executors=6
spark_executor_memory=4g
spark_executor_cores=1
spark_executor_memory_over_head=1024
spark_sql_shuffle_partitions=18
spark.default.parallelism=18

主要是这七个参数,这七个个参数的说明如下:

  • spark_driver_memory设置driver的内存大小
  • spark_num_executors设置executors的个数
  • spark_executor_memory设置每个spark_executor_cores的内存大小
  • spark_executor_cores设置每个executor的cores数目
  • spark_executor_memory_over_head设置executor执行的时候,用的内存可能会超过executor-memoy,所以会为executor额外预留一部分内存。该参数代表了这部分内存
  • spark_sql_shuffle_partitions设置executor的partitions个数,注意这个参数只对SparkSQL有用。
  • spark.default.parallelism设置executor的partitions个数,注意这个参数只对SparkRDD有用

对于这七个参数,需要充分理解Spark执行的逻辑才能明白并合适的配置,Spark的执行逻辑如下(这里不再细讲),可以参照这边博客https://www.cnblogs.com/cxxjohnson/p/8909578.html 或官方API
其中关于内存的配置要结合hadoop yarn的集群的资源情况而定,不是越大越好。而对于

spark_num_executors,spark_executor_cores,spark_sql_shuffle_partitions这三个参数,根据实际的经验需满足spark_sql_shuffle_partitions=spark_num_executorsspark_executor_cores,而spark_executor_cores一般保持在再提交任务时:这里写一写我经常用到的参数。

spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors $spark_num_executors \
--driver-memory $spark_driver_memory \
--executor-memory $spark_executor_memory \
--executor-cores $spark_executor_cores \
--queue yarn_queue_test \
--conf spark.app.name=spark_name_test \
--conf spark.yarn.executor.memoryOverhead=$spark_executor_memory_over_head \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.dynamicAllocation.enabled=false \
--jars test.jar 

Task数据分布的优化

在一般情况下Task数据分配是随机默认的,这样会带来一个问题,如果多大的Task,而只是部分的Task数据处理量大,大部分很小,那么如果能做到将小部分的Task数据处理量优化到和大部分的大致相等,那么性能自然就提升上去了。这样优化分为两步:

a.在执行的Java代码中获取num_executors参数的值,上面的例子是

spark_num_executors=6
 int rddPartition = Integer.parseInt(parameterParse.getNum_executors()) * 3;

b.不管是rdd的遍历还是直接的

session.sql("sql").foreachPartition()在遍历之前加上一个方法repartition(partition)
  session.sql(sqlStr).repartition(partition).foreachPartition(iterator -> {
            while (iterator.hasNext()) {
                Row row = iterator.next();
                //逻辑处理
                }
        });

这样做后,在任务的管理页面看到的executor数据分布式非常均匀的,从而提高性能

分而治之

分而治之是贯穿整个大数据计算的核心,不管是MapReduce,Spark,Flink等等,而这里要说的分而治之可以初略的物理流程上的分而治之,而不是对Spark的driver,executor,Task分而治之,因为本身就是分布式的分而治之。

假设经过反复的性能压力测试,得出Spark在现有规定资源上只有1000000条/s的性能,而现在的数据有一亿条。现在不做任何处理提交session.sql(“sql”).foreachPartition()或rdd.foreachPartition(),虽然最终会处理完,但发现时间是比预定的100000000/1000000s多得多,这样会拖累整体性能,这个时候是可以对现有的一亿条数据做以1000000条为组的组合切割分配成100000000/1000000个集合,对集合数据依次执行,这样性能上会有所提升。当然这种优化方式还是需要跟实际业务逻辑来定.

我这只是做了一些简单的总结1具体的还要具体问题具体分析。

其实我这只是个人的理解,如果有什么不对的希望大家的指正多谢大家。

广播变量

广播变量:Broadcast,将大变量广播出去,而不是直接使用

  • 为什么要用Broadcast

当进行随机抽取一些操作,或者从某个表里读取一些维度的数据,比如所有商品品类的信息,在某个算子函数中要使用到,加入该数据大小为100M,那么1000个task将会消耗100G的内存,集群损失不可估量

  • Broadcast的原理

默认的情况下,每个task执行的算子中,使用到了外部的变量,每个task都会获取一份变量的副本,所以会消耗很多的内存,进而导致RDD持久化内存不够等情况,大大影响执行速度。

广播变量,在driver上会有一份初始的副本,task在运行的时候,如果要使用广播变量中的数据,首先会在自己本地的Executor对应的BlockManager中尝试获取变量副本,并保存在本地的BlockManager中,此后这个Executor上的所有task,都会直接使用本地的BlockManager中的副本,Executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本,距离越近越好。

总而言之: 广播变量的好处不是每一个task一份变量副本,而是变成每个节点的executor才一份副本,这样的话就可以变量产生的副本大大减少。

shuffle 调优

shuffle原理

什么情况下发生shuffle? 在Spark中,主要有以下几个例子:

  • groupByKey : 把分布在各个节点上的数据中的同一个key对应的value都集中到一块儿,集中到集群中的一个节点中,也即是集中到一个节点的executor的一个task中
  • reduceByKey : 算子函数对values集合进行reduce操作,最后生成一个value
  • countByKey : 在一个task中获取同一个key对应的所有value,然后计数,统计总共有多少value
  • join : 两个RDD,Key相同的两个value都集中到一个executor的task中

shuffle过程:

在一个shuffle过程中,前半部分stage中,每个task都会创建后半部分stage中相同task数量的文件,比如stage后半部分有100个task,那么前半部分的每个task都会创建100个文件(先写入到内存缓冲中,后溢满写入到磁盘),会将同一个key对应的values写入同一个文件中,shuffle后半部分的stage中的task,每个task都会从各个节点的task创建其中一份属于自己的那份文件中,拉取属于自己的key-value对,然后task会有一个内存缓冲区,然后调用HashMap进行key-values的聚合,最终调用我们定义的聚合函数来进行相应的操作

shuffle调优之合并map端输出文件

默认情况下,Spark是不开启合并map端输出文件机制的,所以当分批次执行task时,每批的task都会创建新的文件,而不会共用,大大影响了性能,所以当有大量map文件生成时,需要开启该机制

设置方法

new SparkConf().set(“spark.shuffle.consolidateFiles”, “true”)

  • 设置合并机制之后:

第一个stage,并行运行2个task,运行这两个task时会创建下一个stage的文件,运行完之后,会运行下一批次的2个task,而这一批次的task则不会创建新的文件,会复用上一批次的task创建的文件.

第二个stage的task在拉取上一个stage创建的文件时就不会拉取那么多文件了,而是拉取少量文件,每个输出文件都可能包含了多个task给自己的map端输出。

  • shuffle调优之map端内存缓冲和reduce内存占比。

默认情况下: 每个task的内存缓冲为32kb,reduce端内存占比为0.2(即默认executor内存中划分给reduce task的微20%) 所以在不调优的情况下,如果map端task处理的比较大,内存不足则溢满写入磁盘 比如: 每个task就处理320kb,32kb,总共会向磁盘溢写320 / 32 = 10次。每个task处理32000kb,32kb,总共会向磁盘溢写32000 / 32 = 1000次。同理,ruduce端也一样

何时调优?

通过Spark UI查看shuffle磁盘的read和write是不是很大,如果很大则应相应调优

如何调优?

spark.shuffle.file.buffer : 32kb -> 128kb spark.shuffle.memoryFraction: 0.2 -> 0.3

总结

这篇调优只是针对平时比较简单问题的调优,具有复杂的咱们还要具体问题具体分析。比方说对于RDD的优化,这些我在面试基本知识里面已经讲过,

请先登录,感受更多精彩内容
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

跟Kafka学技术系列之时间轮
kafka的延迟队列使用时间轮实现,能够支持大量任务的高效触发,但是在kafka延迟队列实现方案里还是看到了delayQueue的影子,使用delayQueue是对时间轮里面的bucket放入延迟队列,以此来推动时间轮滚动,但是基于将插入和删除操作则放入时间轮中,将这些操作的时间复杂度都降为O(1)
记一次 Java 服务性能优化
背景前段时间我们的服务遇到了性能瓶颈,由于前期需求太急没有注意这方面的优化,到了要还技术债的时候就非常痛苦了。在很低的 QPS 压力下服务器 load 就能达到 10-20,CPU 使用率 60% 以
代码优化日记 ——火焰图找问题代码
一、问题背景- 排序服务,用于推荐item分数预测,详细项目背景及排序请求执行逻辑可参考之前的一篇文章 :《[性能优化:线程资源回收](https://heapdump.cn/user/708
再聊 TCP backlog
这篇文章我们以 backlog 参数来深入研究一下建连的过程。通过阅读这篇文章,你会了解到下面这些知识:- backlog、半连接队列、全连接队列是什么- linux 内核是如何计算半连接队列、全连接
在被线上大量日志输出导致性能瓶颈毒打了很多次之后总结出的经验
由于线上业务量级比较大(日请求上亿,日活用户几十万),同时业务涉及逻辑很复杂,线上日志级别我们采用的是 info 级别,导致线上日志量非常庞大,经常遇到因为日志写入太慢导致的性能瓶颈(各微服务每小时日
收藏:一些比较好的Redis 性能优化思路总结
在一些网络服务的系统中,Redis 的性能,可能是比 MySQL 等硬盘数据库的性能更重要的课题。比如微博,把热点微博[1],最新的用户关系[2],都存储在 Redis 中,大量的查询击中 Redis
记一次线程池调优经历
原文链接:https://www.cnblogs.com/superfj/p/8313469.html作者:Janti 背景最近的一个项目需要用到招标,临时加了给我们的系统增加了一个性能需求,多少呢?
近期业务大量突增微服务性能优化总结-4.增加对于同步微服务的 HTTP 请求等待队列的监控
最近,业务增长的很迅猛,对于我们后台这块也是一个不小的挑战,这次遇到的核心业务接口的性能瓶颈,并不是单独的一个问题导致的,而是几个问题揉在一起:我们解决一个之后,发上线,之后发现还有另一个的性能瓶颈问
0
0