Spark性能优化——和shuffle搏斗

  • 时间:
  • 浏览:0
  • 来源:uu快3新平台_uu快3诀窍_讨论群

首那么明确哪些是shuffle。Shuffle指的是从map阶段到reduce阶段转换的之后,即map的output向着reduce的input映射的之后,并不节点一一对应的,即干map工作的slave A,它的输出可能性要分散跑到reduce节点A、B、C、D …… X、Y、Z去,就好像shuffle的字面意思“洗牌”一样,哪些map的输出数据要打散你你是什么根据新的路由算法(比如对key进行你是什么hash算法),发送到不同的reduce节点上去。(下面这幅图来自《Spark Architecture: Shuffle》)

先去重,再合并

ps:可能性他们 像尝试一下spark的数率,都都要使用一下阿里云的 E-MapReduce

用reduceByKey代替groupByKey

之后写过一篇文章,比较了几种不同场景的性能优化,包括portal的性能优化,web service的性能优化,还有Spark job的性能优化。Spark的性能优化有你你是什么特殊的地方,比如实时性一般找不到考虑范围之内,通常他们 用Spark来处理的数据,后要 要求异步得到结果的数据;再比如数据量一般都很大,要不然就说 还可否必要在集群上操纵不还可否一还有一个他们 伙,等等。事实上,他们 都知道不还可否银弹,你你是什么每你是什么性能优化场景后要 你你是什么特定的“大boss”,通扎住住和处理大boss之后,能处理其中一大每段问题。比如对于portal来说,是页面静态化,对于web service来说,是高并发(当然,这你是什么都都要说并不确切,这就说 我针对我参与的项目总结的经验而已),而对于Spark来说,你你是什么大boss就说 我shuffle。

1

明确哪些操作都要在master完成

2

A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}

A.filter{case (name, age, sex) => b.values.contains(age)}

2

1

你就都都要想象,执行的之后超大的A被打散和下发到各个节点去。你你是什么更要命的是,为了恢复一之后之后刚开始 的(name, age, sex)的型态,又做了一次map,而这次map一样意味shuffle。两次shuffle,太疯狂了。你你是什么可能性原先写:

可能性上端结果rdd可能性被调用多次,都都要显式调用cache()和persist(),以告知Spark,保留当前rdd。当然,即便不必还可否做,Spark依然存放不久前计算过的结果(以下来自官方指南):

要减少shuffle的开销,主要有一还有一个思路:

 .join(B)

A.collect.foreach(println)

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

一次shuffle都不还可否,A老老实实待着不动,等着全量的B被下发过来。

另外,还有三根同类的是用treeReduce来代替reduce,主就说 我用于单个reduce操作开销比较大,都都要条件treeReduce的深度图来控制每次reduce的规模。

有你你是什么同类的xxxByKey操作,都比groupByKey好,比如foldByKey和aggregateByKey。

A.mapValues{case ((B, C), (D, E)) => (B, C, E)}

你你是什么条应该是比较经典的了。reduceByKey会在当前节点(local)中做reduce操作,也就说 我说,会在shuffle前,尽可能性地减小数据量。而groupByKey则后要 ,它会不做任何处理而直接去shuffle。当然,有你你是什么场景下,功能上二者并不还可否互相替换。可能性reduceByKey要求参与运算的value,你你是什么和输出的value类型要一样,你你是什么groupByKey则不还可否你你是什么要求。

在工作中遇到原先一还有一个问题,都要转打上去原先一还有一个非常巨大的RDD A,型态是(countryId, product),key是国家id,value是商品的具体信息。当时在shuffle的之后,你你是什么hash算法是根据key来确定节点的,你你是什么事实上你你是什么countryId的分布是极其不均匀的,大每段商品后要 美国(countryId=1),于是他们 通过Ganglia看一遍,其中一台slave的CPU有点儿高,计算详细聚集到那一台去了。

mapValues比map好

1

A.distinct().union(B.distinct()).distinct()

Spark的性能分析和调优很有意思,今天再写一篇。主要话题是shuffle,当然也牵涉你你是什么你你是什么代码上的小把戏。

想把RDD的内容逐条打印出来,你你是什么结果却不还可否再次冒出在stdout上端,可能性你你是什么步操作被倒进slave上端去执行了。虽然只都要collect一下,哪些内容就被加载到master的内存中打印了:

再比如,可能性遇到RDD操作嵌套的情况报告,通常考虑优化掉,可能性不还可否master不还可否去理解和执行RDD的操作,slave不还可否处理被分配的task而已。比如:

转自:http://www.raychase.net/3788

就都都要用join来代替:

A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}

找到意味之后,问题处理就容易了,要么处理你你是什么shuffle,要么改进一下key,让它的shuffle不必还可否均匀分布(比如都都要拿countryId+商品名称的tuple作key,甚至生成一还有一个随机串)。

 .map{case (age, ((name, sex), title)) => (name, age, sex)}

可能性想打印你你是什么东西到stdout里去:

比如有A、B原先一还有一个规模比较大的RDD,可能性本人内部内部结构有少许重复,不还可否二者一合并,再去重:

现在我都都要从全国人民中找出哪些有称号的人来。可能性直接写成:

数据量大,之所以慢。通常情况报告下,可能性Spark的job是倒进内存上端进行运算的,你你是什么一还有一个复杂的map操作不一定执行起来那么快。你你是什么可能性牵涉到shuffle,这上端有网络传输和序列化的问题,后要 可能性非常慢。

明确key不必变的map,就用mapValues来替代,可能性原先都都要保证Spark不必shuffle你的数据:

原先的操作之所以正确,你你是什么可能性都都要先本人去重,再合并,再去重,都都要大幅度减小shuffle的开销(注意Spark的默认union和Oracle上端的“union all”很像——不去重):

1

A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}

链接地址为:https://emr.console.aliyun.com/#/cluster/region/cn-hangzhou

3

为哪些说shuffle是Spark job的大boss,就说 我可能性Spark你是什么的计算通常后要 在内存中完成的,比如原先一还有一个map型态的RDD:(String, Seq),key是字符串,value是一还有一个Seq,可能性就说 我对value进行一一映射的map操作,比如(1)先计算Seq的长度,(2)再把你你是什么长度作为元素打上去到Seq上端去。这两步计算,都都都要在local完成,而事实上也是在内存中操作完成的,换言之,不都要跑到别的node上去拿数据,你你是什么执行的数率是非常快的。你你是什么,可能性对于一还有一个大的rdd,shuffle居于的之后,就会可能性网络传输、数据序列化/反序列化产生少许的磁盘IO和CPU开销。你你是什么性能上的损失是非常巨大的。

1

不均匀的shuffle

用broadcast + filter来代替join

1

1

同类地,还有filter等等操作,目的也是要先对大的RDD进行“瘦身”操作,你你是什么在做你你是什么操作。

A.map{case (name, age, sex) => (age, (name, sex))}

改成:

val b = sc.broadcast(B.collectAsMap)

你你是什么优化是你是什么特定场景的神器,就说 我拿大的RDD A去join一还有一个小的RDD B,比如有原先一还有一个RDD:

另外,在Spark SQL上端直接有BroadcastHashJoin,也是把小的rdd广播出去。

看起来变复杂了对不对,你你是什么当时我处理你你是什么问题的之后,用第二种法子时间开销从六个小时减到20分钟。

1