军事
shuffle怎么用(二十、图解Spark的Shuffle原理)

1.概述

在MR框架中,Shuffle是连接Mapper和Reducer之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce, 而Reduce阶段负责从Map端拉取数据并行计算。在整个shuffle过程中,往往伴随着大量的磁盘IO和网络的数据传输。所以,shuffle性能的高低也直接决定了整个应用程序性能的高低。相比较于MR的shuffle, Spark框架也有自己的独特的shuffle实现过程。

2.Hadoop的Mapreduce Shuffle原理

  • Shuffle过程中,提供数据的一端被称作Map端,Map端每个生成数据的任务称为Mapper
  • 接收数据的一端被称作reduce端,reduce端负责拉取数据,拉取的数据被称为Reducer
  • Shuffle过程的本质上是将Map端处理的数据使用分区器进行划分,并将这些数据落地磁盘,并由Reducer端负责拉取属于自己分区的数据的过程。

MR


  • shuffle write
    • 将每个task处理的数据按key进行分区,即对相同的key应用hash算法,从而将相同的key写入同一个磁盘文件中,而每一个磁盘文件都只属于reduce端的stage的一个task
  • shuffle read
    • stage的每一个task就需要将上一个stage的计算结果中所有相同的key, 从各个节点上通过网络传输都拉取到自己所在的节点上,然后进行key的聚合或连接等操作,shuffle read的拉取过程是一边拉取一边进行聚合
  • Hash Shuffle普通机制存在的问题
    • shuffle前在磁盘上会产生海量的数据小文件(M*R, 即map task的数量*reduce task的数量),建立通信和拉取数据的次数就会变多,此时会产生大量耗时的IO操作,可能因为网络原因或者会产生OOM风险。

4.2.Hash Shuffle的合并机制

优化的Hash Shuffle

  • 每一个Executor进程根据核数,决定Task的并发数量,比如executor核数是2,就是可以并发运行两个task,如果是一个则只能运行一个task。
  • 假设executor核数是1,MapTask数量是M,那么它依然会根据ResultTask的数量R,创建R个buffer缓存,然后对key进行hash,数据进入不同的buffer中,每一个buffer对应着一个block file,用于刷新buffer缓存里的数据。
  • 然后下一个task运行的时候,那么不会再创建新的buffer和block file,而是复用之前的task已经创建好的buffer和block file。即同一个Executor进程里所有Task都会把相同的key放入相同的buffer缓冲区中。
  • 这样的话,生成文件的数量就是(本地worker的executor数量*executor的cores*ResultTask数量)如上图所示,即2 * 1* 3 = 6个文件,每一个Executor的shuffle MapTask数量100,ReduceTask数量为100,那么未优化的HashShuffle的文件数是2 *1* 100*100 =20000,优化之后的数量是2*1*100 = 200文件,相当于少了100倍。
  • 缺点:如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。

4.3 Sort Shuffle普通机制

对于Bypass机制,使用这个模式需要考虑几下几点:

  • 主要用于处理不需要排序和聚合的Shuffle操作,所以数据是直接写入文件,数据量较大的时候,网络I/O和内存负担较重
  • 主要适合处理Reducer任务数量比较少的情况下
  • 将每一个分区写入一个单独的文件,最后将这些文件合并,减少文件数量;但是这种方式需要并发打开多个文件,对内存消耗比较大。
  • 因为bypass机制这种方式不需要排序,所以效率比Sort Shuffle的普通机制高,所以在reduce端数量不大,又不需要在map端做聚合和排序,并且shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值, 就用这种方式。

顶一下()     踩一下()

热门推荐

发表评论
0评