stone

Spark任务执行
这篇文章主要是总结一下Spark提交任务,执行的任务的流程。基本概念| 名词 | 说明 ...
扫描右侧二维码阅读全文
21
2016/12

Spark任务执行

这篇文章主要是总结一下Spark提交任务,执行的任务的流程。

基本概念

| 名词 | 说明 |
| :-: | :-: |
| client | 负责用户任务提交 |
| Driver | 负责作业控制 |
| Master | 主控节点 |
| Worker | 执行节点 |
| Executor | 执行单元 |
| BlockManager | RDD缓存管理器 |

用户通过client提交任务到Driver后,Driver通过分析RDD的依赖关系,绘制响应的DAG图,然后创建一系列的TaskSet,分发给不同的Executor进行处理。具体执行在不同的部署模式下略有不同。

这里需要注意,每一个Spark Application的运行环境都是独立的,不会互相影响。每一个Application的运行过程可以分为下面几个阶段:

  1. 构建应用运行时的环境
  2. 将程序转换为DAG
  3. 执行DAG
  4. 销毁运行环境

DAG

DAG(有向无环图),主要是描述了任务执行的流程,由RDD通过依赖关系生成。下面是一个DAG图:

一个DAG图由这几部分组成:rdd,partition,stage。

所有的操作都是在partition上进行的,而不是rdd,partition是spark操作的最小单位。

RDD的依赖关系分为窄依赖和宽依赖两种,宽依赖涉及数据混洗,需要等待所有父RDD准备就绪后才能进行,而窄依赖则不需要涉及数据混洗,可以采用流水线的方式进行处理(pipeline)。根据这种依赖关系,可以将不同的Task划分到不同的Stage中。

在生成DAG的过程是对执行Action操作的RDD进行回溯,根据RDD中的依赖关系(即对父节点的依赖)进行计算。开始的时候整个DAG划分为一个Stage(finalStage),然后从最后的一个RDD进行回溯,如果遇到窄依赖,则继续回溯,如果遇到宽依赖,则划分出一个新的Stage,然后继续回溯,直到完成。所以每一个Stage由一组Task组成。

实际上DAG的生成可能还涉及剪枝的问题,会比较复杂。

上面是一个wordcount的DAG图,这里主要阐述一个细节问题。上面的DAG由两个Stage组成,中间有一个Shuffled操作,第一个Stage由三个Task组成,通过pipeline的方式执行。在三个Task都执行完成之后才会执行Shuffled操作,然后下一个Stage读取Shuffled的结果,继续进行计算。而中间Shuffled的结果是怎么交给MapPartitionsRDD的呢?ShuffledRDD执行的中间结果是保存在硬盘中的,下一个阶段的MapPartitionsRDD则通过网络传输拉去到本地,然后再进行合并。很明显这样的操作会造成很大的性能损耗,所有在开发过程中尽量减少这样的操作。

执行过程

不同的部署方式的执行过程略有不同,但是大体上的概念是一致。

1.Standalone模式

在Standalone模式下,有两种deploy模式,主要区别在Driver运行在哪里,默认是client模式,即Driver运行在客户端上,另外一种是运行在Worker上。

无论那一种模式,运行起来还是差不多的。

在这里面有如下主要的进程:

Master:负责资源管理,命令Worker启动Executor和Driver

Worker:worker是工作节点的守护进程,负责向Master汇报心跳,启动Executor和Driver

Driver:一个作业(Application)的核心,负责DAG图的构建,Stage的划分,Task的管理和调度,生成SchedulerBackend等等任务。

Executor:真正执行任务的进程,每个Application一般会使用多个Worker,每个Worker上只会启动一个Executor进程,每个Executor受Driver控制,可以启动多个Task任务执行。

执行作业的流程整理如下:

  1. client提交任务到Master节点
  2. Master让一个Worker启动Driver,让其他的Worker启动Executor
  3. 负责启动Driver的Worker会启动DriverRunner线程,然后启动SchedulerBackend进程,SchedulerBackend中包含DAGScheduler,DAGScheduler根据作业构建DAG图和划分Stage,产生TaskSet,交给TaskScheduler进行处理。
  4. 其余负责启动Executor的Worker会启动ExecutorRunner线程,然后启动ExecutorBackend,并向SchedulerBackend进行注册,然后从TaskSchedule获取Task,并将执行情况汇报给SchedulerBackend
  5. 在执行的过程中,所有Worker都会向master发送心跳信息,然后master向Driver汇报各个worker的情况。

这里有几种故障情况的处理:

(1) Worker故障。因为心跳信号的丢失,Master知道这个Worker出现故障了,然后就将这个节点进行移除,并将情况汇报给Driver。

(2) Executor故障。此时ExecutorRunner会将故障情况汇报给Master,然后由Master给改worker发送重启启动Executor的指令。

(3) Master故障。这个主要通过zookeeper提供的HA解决

从上面大概可以看出,master主要负责资源的健康检查和管理,而Driver主要负责任务的执行和调度。

2.YARN模式

在YARN模式下执行的流程基本相似,上面的Spark Application Master相当与standalone中的SchedulerBackend,而Executor相当于standalone中的ExecutorBackend。

作业事件流

作业的过程可以概括如下:

  1. 操作RDD对象,如果遇到Action操作,则执行runJob指令,将任务提交到DAGScheduler。
  2. DAGScheduler进行回溯产生DAG图,然后通过submitTasks将任务提交到TaskScheduler。
  3. TaskScheduler将TaskSet通过某种方式分发到Executor上执行,当这个Task执行够会给TaskScheduler发送完成的消息,当这个TaskSet执行完之后,则会将DAGScheduler发送完成消息。
  4. Executor收到Task会执行LaunchTask操作,然后生成TaskRunner执行任务。

分发到Executor的Task有4个参数:targetRDD,partitions,func,listeners。

运行环境的准备

运行环境的准备可以分为两种方式,粗粒度和细粒度:

粗粒度就是在执行之前就将所有需要使用到的资源准备好,然后再执行任务。

细粒度就是在执行的过程中动态地申请资源。

在Spark1.5版本中,YARN模式只支持粗粒度的构建方式。

Last modification:September 7th, 2018 at 08:21 pm
If you think my article is useful to you, please feel free to appreciate

Leave a Comment