微信号:iteblog_hadoop

介绍:每两天一篇关于Hadoop、Flume、Spark、Hbase、Hive、Zookeeper、Mapreduce、HDFS相关的技术博文,大数据技术博客:http://www.iteblog.com,或者Google\百度搜索 过往记忆

Spark RDD上的map operators是如何pipeline起来的

2019-03-04 08:19 胡甫旺

本文原文(点击下面 阅读原文 即可进入):https://www.jianshu.com/p/45c9ee55eea6

最近在工作讨论中,同事提出了这么一个问题:作用在一个RDD/DataFrame上的连续的多个map是在对数据的一次循环遍历中完成的还是需要多次循环?

当时我很自然地回答说:不需要多次循环,spark会将多个map操作pipeline起来apply到rdd partition的每个data element上。

事后仔细想了想这个问题,虽然我确信spark不可能傻到每个map operator都循环遍历一次数据,但是这些map操作具体是怎么被pipeline起来apply的呢?这个问题还真不太清楚。于是乎,阅读了一些相关源码,力求把这个问题搞清楚。本文就是看完源码后的一次整理,以防过几天又全忘了。

我们从DAGScheduler的submitStage方法开始,分析一下map operators(包括map, filter, flatMap等) 是怎样被pipeline起来执行的。

submit stage

我们知道,spark的每个job都会被划分成多个stage,这些stage会被DAGScheduler以task set的形式提交给TaskScheduler以调度执行,DAGScheduler的submitStage方法实现了这一步骤。

如果当前stage没有missingParentStage(未完成的parent stages),submitStage会调用submitMissingTasks,这个方法是做了一些工作的,主要有:

1. 找到当前stage需要计算的partitions

stage的partitions就是其对应rdd的partitions,那么stage对应的rdd是怎么确定的呢?源码注释是这样解释的:

@param rdd RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks on, while for a result stage, it's the target RDD that we ran an action on

我的理解是:对于shuffle map stage,它的rdd就是引发shuffle的那个operator(比如reduceByKey)所作用的rdd;对于result stage,就是action(比如count)所作用的rdd.

2. 初始化当前stage的authorizedCommiters

一个partition对应一个task,当一个task完成后,它会commit它的输出结果到HDFS. 为了防止一个task的多个attempt都commit它们的output,每个task attempt在commit输出结果之前都要向OutputCommitCoordinator请求commit的permission,只有获得批准的attempt才能commit. 批准commit的原则是: "first committer wins" . 

在submitMissingTasks方法中会把当前stage的所有partitions对应的tasks的authorizedCommitter都设置为-1,也就是还没有获批的committer.

3. 获取每个需要计算的partitions的preferred location

根据每个partition的数据locality信息获取对应task的preferred locations.

4. 序列化并广播taskBinary

taskBinary包含了执行task所需要的信息(包括数据信息,代码信息)。对于不同的task type,taskBinary包含的信息有所不同。

spark有两种类型的task : shuffle map task和result task, 与上面提到的shuffle map stage和result stage相对应。

shuffle map task的作用是把rdd的数据划分到多个buckets里面,以便shuffle过程使用。这里的划分是依据shuffleDependency中指定的partitioner进行的,所以shuffle map task的taskBinary反序列化后的类型是(RDD[_], ShuffleDependency[_, _, _])

result task的作用是在对应的rdd partition上执行指定的function,所以result task的taskBinary反序列化后的类型是(RDD[T], (TaskContext, Iterator[T]) => U)

生成taskBinary的代码:

5. 生成tasks

result stage生成result tasks,shuffle map stage生成shuffle map tasks。有多少个missing partition,就会生成多少个task。可以看到taskBinary被作为参数用于构建task对象。

6. 构建task set并向taskScheduler提交

spark map operators如何被pipeline的

通过上面的分析,我们知道rdd的map operators最终都会被转化成shuffle map task和result task,然后分配到exectuor端去执行。那么这些map operators是怎么被pipeline起来执行的呢?也就是说shuffle map task和result task是怎么把这些operators串联起来的呢?

为了回答这个问题,我们还需要阅读一下ShuffleMapTask和ResultTask的源码 : 

shuffle map task和result task都会对taskBinary做反序列化得到rdd对象并且调用rdd.iterator函数去获取对应partition的数据。我们来看看rdd.iterator函数做了什么:

rdd.iterator调用了rdd.getOrCompute

getOrCompute会先通过当前executor上的blockManager获取指定block id的block,如果block不存在则调用computeOrReadCheckpoint,computeOrReadCheckpoint会调用compute方法进行计算,而这个compute方法是RDD的一个抽象方法,由RDD的子类实现。

因为filter, map, flatMap操作生成的RDD都是MapPartitionsRDD, 所以我们以MapPartitionsRDD为例:

可以看到,compute方法调用了parent RDD的iterator方法,然后apply了当前MapPartitionsRDD的f参数. 那这个f又是什么function呢?我们需要回到RDD.scala中看一下map, filter, flatMap的code:

从上面的源码可以看出,MapPartitionsRDD中的f函数就是对parent rdd的iterator调用了相同的map函数以执行用户给定的function。

所以这是一个逐层嵌套的rdd.iterator方法调用,子rdd调用父rdd的iterator方法并在其结果之上调用scala.collection.Iterator的map函数以执行用户给定的function,逐层调用,直到调用到最初的iterator(比如hadoopRDD partition的iterator)。

现在,我们最初的问题:“多个连续的spark map operators是如何pipeline起来执行的?” 就转化成了“scala.collection.Iterator的多个连续map操作是如何pipeline起来的?”

scala.collection.Iterator的map operators是怎么构成pipeline的?

看一下scala.collection.Ierator中map, filter, flatMap函数的源码:

从上面的源码可以看出,Iterator的map, filter, flatMap方法返回的Iterator就是基于当前Iterator (self)override了next和hasNext方法的Iterator实例。比如,对于map函数,结果Iterator的hasNext就是直接调用了self iterator的hasNext,next方法就是在self iterator的next方法的结果上调用了指定的map function。

flatMap和filter函数稍微复杂些,但本质上一样,都是通过调用self iterator的hasNext和next方法对数据进行遍历和处理。

所以,当我们调用最终结果iterator的hasNext和next方法进行遍历时,每遍历一个data element都会逐层调用父层iterator的hasNext和next方法。各层的map function组成了一个pipeline,每个data element都经过这个pipeline的处理得到最终结果数据。

总结

  • 对RDD的operators最终会转化成shuffle map task和result task在exectuor上执行。

  • 每个task (shuffle map task 或 result task)都会被分配一个taskBinary,taskBinary以broadCast的方式分发到每个executor,每个executor都会对taskBinary进行反序列化,得到对应的rdd,以及对应的function或shuffle dependency(function for result task, shuffle dependency for shuffle map task)。

  • task通过调用对应rdd的iterator方法获取对应partition的数据,而这个iterator方法又会逐层调用父rdd的iterator方法获取数据。这一过程底层是通过覆写scala.collection.iterator的hasNext和next方法实现的。

  • RDD/DataFrame上的连续的map, filter, flatMap函数会自动构成operator pipeline一起对每个data element进行处理,单次循环即可完成多个map operators, 无需多次遍历。


说明

  • 本文源码均是apache spark 2.1.1版本。

  • 本文只讨论了像filter, map, flatMap这种依次处理每个data element的map operators,对于像mapPartitions这种对partition进行处理的operator未做讨论。

  • 如有错误,敬请指正。

猜你喜欢

欢迎关注本公众号:iteblog_hadoop:

回复 spark_summit_201806 下载 Spark Summit North America 201806 全部PPT

spark_summit_eu_2018 下载 Spark+AI Summit europe 2018 全部PPT

回复 HBase_book 下载 2018HBase技术总结 专刊

0、回复 电子书 获取 本站所有可下载的电子书

1、为什么不建议在 HBase 中使用过多的列族

2、Elasticsearch 6.3 发布,你们要的 SQL 功能来了

3、列式存储和行式存储它们真正的区别是什么

4、分布式原理:一致性哈希算法简介

5、分布式快照算法: Chandy-Lamport 算法

6、Kafka分区分配策略

7、分布式原理:一文了解 Gossip 协议

8、干货 | Apache Spark 2.0 作业优化技巧

9、HBase Rowkey 设计指南

10、HBase 入门之数据刷写详细说明

11、更多大数据文章欢迎访问https://www.iteblog.com及本公众号(iteblog_hadoop)
12、Flink中文文档:
http://flink.iteblog.com
13、Carbondata 中文文档
http://carbondata.iteblog.com

 
Hadoop技术博文 更多文章 分布式键值存储 Dynamo 的实现原理 HBase 中加盐之后的表如何读取:Spark 篇 听说你好不容易写了个爬虫,结果没抓几个就被封了? 分布式快照算法: Chandy-Lamport 算法 HBase 协处理器入门及实战
猜您喜欢 致传智学子的一封信 视野三 移动领域即时通讯技术选型之开源篇 StefanLage/SLPagingView Python基础教程17:日期和时间 Programming iOS9 翻译 - Views