性能文章>简单的聊一聊Spark的性能优化>

简单的聊一聊Spark的性能优化原创

355235

Spark的性能优化

参数级的优化

spark_driver_memory=4g
spark_num_executors=6
spark_executor_memory=4g
spark_executor_cores=1
spark_executor_memory_over_head=1024
spark_sql_shuffle_partitions=18
spark.default.parallelism=18

主要是这七个参数,这七个个参数的说明如下:

  • spark_driver_memory设置driver的内存大小
  • spark_num_executors设置executors的个数
  • spark_executor_memory设置每个spark_executor_cores的内存大小
  • spark_executor_cores设置每个executor的cores数目
  • spark_executor_memory_over_head设置executor执行的时候,用的内存可能会超过executor-memoy,所以会为executor额外预留一部分内存。该参数代表了这部分内存
  • spark_sql_shuffle_partitions设置executor的partitions个数,注意这个参数只对SparkSQL有用。
  • spark.default.parallelism设置executor的partitions个数,注意这个参数只对SparkRDD有用

对于这七个参数,需要充分理解Spark执行的逻辑才能明白并合适的配置,Spark的执行逻辑如下(这里不再细讲),可以参照这边博客https://www.cnblogs.com/cxxjohnson/p/8909578.html 或官方API
其中关于内存的配置要结合hadoop yarn的集群的资源情况而定,不是越大越好。而对于

spark_num_executors,spark_executor_cores,spark_sql_shuffle_partitions这三个参数,根据实际的经验需满足spark_sql_shuffle_partitions=spark_num_executorsspark_executor_cores,而spark_executor_cores一般保持在再提交任务时:这里写一写我经常用到的参数。

spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors $spark_num_executors \
--driver-memory $spark_driver_memory \
--executor-memory $spark_executor_memory \
--executor-cores $spark_executor_cores \
--queue yarn_queue_test \
--conf spark.app.name=spark_name_test \
--conf spark.yarn.executor.memoryOverhead=$spark_executor_memory_over_head \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.dynamicAllocation.enabled=false \
--jars test.jar 

Task数据分布的优化

在一般情况下Task数据分配是随机默认的,这样会带来一个问题,如果多大的Task,而只是部分的Task数据处理量大,大部分很小,那么如果能做到将小部分的Task数据处理量优化到和大部分的大致相等,那么性能自然就提升上去了。这样优化分为两步:

a.在执行的Java代码中获取num_executors参数的值,上面的例子是

spark_num_executors=6
 int rddPartition = Integer.parseInt(parameterParse.getNum_executors()) * 3;

b.不管是rdd的遍历还是直接的

session.sql("sql").foreachPartition()在遍历之前加上一个方法repartition(partition)
  session.sql(sqlStr).repartition(partition).foreachPartition(iterator -> {
            while (iterator.hasNext()) {
                Row row = iterator.next();
                //逻辑处理
                }
        });

这样做后,在任务的管理页面看到的executor数据分布式非常均匀的,从而提高性能

分而治之

分而治之是贯穿整个大数据计算的核心,不管是MapReduce,Spark,Flink等等,而这里要说的分而治之可以初略的物理流程上的分而治之,而不是对Spark的driver,executor,Task分而治之,因为本身就是分布式的分而治之。

假设经过反复的性能压力测试,得出Spark在现有规定资源上只有1000000条/s的性能,而现在的数据有一亿条。现在不做任何处理提交session.sql(“sql”).foreachPartition()或rdd.foreachPartition(),虽然最终会处理完,但发现时间是比预定的100000000/1000000s多得多,这样会拖累整体性能,这个时候是可以对现有的一亿条数据做以1000000条为组的组合切割分配成100000000/1000000个集合,对集合数据依次执行,这样性能上会有所提升。当然这种优化方式还是需要跟实际业务逻辑来定.

我这只是做了一些简单的总结1具体的还要具体问题具体分析。

其实我这只是个人的理解,如果有什么不对的希望大家的指正多谢大家。

广播变量

广播变量:Broadcast,将大变量广播出去,而不是直接使用

  • 为什么要用Broadcast

当进行随机抽取一些操作,或者从某个表里读取一些维度的数据,比如所有商品品类的信息,在某个算子函数中要使用到,加入该数据大小为100M,那么1000个task将会消耗100G的内存,集群损失不可估量

  • Broadcast的原理

默认的情况下,每个task执行的算子中,使用到了外部的变量,每个task都会获取一份变量的副本,所以会消耗很多的内存,进而导致RDD持久化内存不够等情况,大大影响执行速度。

广播变量,在driver上会有一份初始的副本,task在运行的时候,如果要使用广播变量中的数据,首先会在自己本地的Executor对应的BlockManager中尝试获取变量副本,并保存在本地的BlockManager中,此后这个Executor上的所有task,都会直接使用本地的BlockManager中的副本,Executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本,距离越近越好。

总而言之: 广播变量的好处不是每一个task一份变量副本,而是变成每个节点的executor才一份副本,这样的话就可以变量产生的副本大大减少。

shuffle 调优

shuffle原理

什么情况下发生shuffle? 在Spark中,主要有以下几个例子:

  • groupByKey : 把分布在各个节点上的数据中的同一个key对应的value都集中到一块儿,集中到集群中的一个节点中,也即是集中到一个节点的executor的一个task中
  • reduceByKey : 算子函数对values集合进行reduce操作,最后生成一个value
  • countByKey : 在一个task中获取同一个key对应的所有value,然后计数,统计总共有多少value
  • join : 两个RDD,Key相同的两个value都集中到一个executor的task中

shuffle过程:

在一个shuffle过程中,前半部分stage中,每个task都会创建后半部分stage中相同task数量的文件,比如stage后半部分有100个task,那么前半部分的每个task都会创建100个文件(先写入到内存缓冲中,后溢满写入到磁盘),会将同一个key对应的values写入同一个文件中,shuffle后半部分的stage中的task,每个task都会从各个节点的task创建其中一份属于自己的那份文件中,拉取属于自己的key-value对,然后task会有一个内存缓冲区,然后调用HashMap进行key-values的聚合,最终调用我们定义的聚合函数来进行相应的操作

shuffle调优之合并map端输出文件

默认情况下,Spark是不开启合并map端输出文件机制的,所以当分批次执行task时,每批的task都会创建新的文件,而不会共用,大大影响了性能,所以当有大量map文件生成时,需要开启该机制

设置方法

new SparkConf().set(“spark.shuffle.consolidateFiles”, “true”)

  • 设置合并机制之后:

第一个stage,并行运行2个task,运行这两个task时会创建下一个stage的文件,运行完之后,会运行下一批次的2个task,而这一批次的task则不会创建新的文件,会复用上一批次的task创建的文件.

第二个stage的task在拉取上一个stage创建的文件时就不会拉取那么多文件了,而是拉取少量文件,每个输出文件都可能包含了多个task给自己的map端输出。

  • shuffle调优之map端内存缓冲和reduce内存占比。

默认情况下: 每个task的内存缓冲为32kb,reduce端内存占比为0.2(即默认executor内存中划分给reduce task的微20%) 所以在不调优的情况下,如果map端task处理的比较大,内存不足则溢满写入磁盘 比如: 每个task就处理320kb,32kb,总共会向磁盘溢写320 / 32 = 10次。每个task处理32000kb,32kb,总共会向磁盘溢写32000 / 32 = 1000次。同理,ruduce端也一样

何时调优?

通过Spark UI查看shuffle磁盘的read和write是不是很大,如果很大则应相应调优

如何调优?

spark.shuffle.file.buffer : 32kb -> 128kb spark.shuffle.memoryFraction: 0.2 -> 0.3

总结

这篇调优只是针对平时比较简单问题的调优,具有复杂的咱们还要具体问题具体分析。比方说对于RDD的优化,这些我在面试基本知识里面已经讲过,

点赞收藏
大数据球球

大数据技术布道者

请先登录,查看3条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步
5
3