数据倾斜是一种很常见的问题(依据二八定律),简单来说,比方WordCount中某个Key对应的数据量非常大的话,就会产生数据倾斜,导致两个后果:
- OOM(单或少数的节点);
- 拖慢整个Job执行时间(其他已经完成的节点都在等这个还在做的节点)
-
聚合倾斜
-
双重聚合(局部聚合+全局聚合)
场景: 对RDD进行reduceByKey等聚合类shuffle算子,SparkSQL的groupBy做分组聚合这两种情况 思路:首先通过map给每个key打上n以内的随机数的前缀并进行局部聚合,即(hello, 1) (hello, 1) (hello, 1) (hello, 1)变为(1_hello, 1) (1_hello, 1) (2_hello, 1),并进行reduceByKey的局部聚合,然后再次map将key的前缀随机数去掉再次进行全局聚合; 原理: 对原本相同的key进行随机数附加,变成不同key,让原本一个task处理的数据分摊到多个task做局部聚合,规避单task数据过量。之后再去随机前缀进行全局聚合; 优点:效果非常好(对聚合类Shuffle操作的倾斜问题); 缺点:范围窄(仅适用于聚合类的Shuffle操作,join类的Shuffle还需其它方案)
-
-
join倾斜
-
将reduce join转为map join
场景: 对RDD或Spark SQL使用join类操作或语句,且join操作的RDD或表比较小(百兆或1,2G); 思路:使用broadcast和map类算子实现join的功能替代原本的join,彻底规避shuffle。对较小RDD直接collect到内存,并创建broadcast变量;并对另外一个RDD执行map类算子,在该算子的函数中,从broadcast变量(collect出的较小RDD)与当前RDD中的每条数据依次比对key,相同的key执行你需要方式的join;
原理: 若RDD较小,可采用广播小的RDD,并对大的RDD进行map,来实现与join同样的效果。简而言之,用broadcast-map代替join,规避join带来的shuffle(无Shuffle无倾斜); 优点:效果很好(对join操作导致的倾斜),根治;
缺点:适用场景小(大表+小表),广播(driver和executor节点都会驻留小表数据)小表也耗内存
-
采样倾斜key并分拆join操作
场景: 两个较大的(无法采用方案五)RDD/Hive表进行join时,且一个RDD/Hive表中少数key数据量过大,另一个RDD/Hive表的key分布较均匀(RDD中两者之一有一个更倾斜); 思路:
- 对更倾斜rdd1进行采样(RDD.sample)并统计出数据量最大的几个key;
- 对这几个倾斜的key从原本rdd1中拆出形成一个单独的rdd1_1,并打上0~n的随机数前缀,被拆分的原rdd1的另一部分(不包含倾斜key)又形成一个新rdd1_2;
- 对rdd2过滤出rdd1倾斜的key,得到rdd2_1,并将其中每条数据扩n倍,对每条数据按顺序附加0~n的前缀,被拆分出key的rdd2也独立形成另一个rdd2_2; 【个人认为,这里扩了n倍,最后union完还需要将每个倾斜key对应的value减去(n-1)】
- 将加了随机前缀的rdd1_1和rdd2_1进行join(此时原本倾斜的key被打散n份并被分散到更多的task中进行join); 【个人认为,这里应该做两次join,两次join中间有一个map去前缀】
- 另外两个普通的RDD(rdd1_2、rdd2_2)照常join;
- 最后将两次join的结果用union结合得到最终的join结果。 原理:对join导致的倾斜是因为某几个key,可将原本RDD中的倾斜key拆分出原RDD得到新RDD,并以加随机前缀的方式打散n份做join,将倾斜key对应的大量数据分摊到更多task上来规避倾斜;
优点: 前提是join导致的倾斜(某几个key倾斜),避免占用过多内存(只需对少数倾斜key扩容n倍); 缺点: 对过多倾斜key不适用。
-
用随机前缀和扩容RDD进行join
场景: RDD中有大量key导致倾斜; 思路:与方案六类似。
- 查看RDD/Hive表中数据分布并找到造成倾斜的RDD/表;
- 对倾斜RDD中的每条数据打上n以内的随机数前缀;
- 对另外一个正常RDD的每条数据扩容n倍,扩容出的每条数据依次打上0到n的前缀;
- 对处理后的两个RDD进行join。
原理: 与方案六只有唯一不同在于这里对不倾斜RDD中所有数据进行扩大n倍,而不是找出倾斜key进行扩容; 优点: 对join类的数据倾斜都可处理,效果非常显著; 缺点: 缓解,扩容需要大内存
-