两种Scheduler
在创建SparkContext对象的时候,一个核心的是模块就是调度器(Scheduler),在spark中Scheduler有两种:
TaskScheduler(是低级的调度器接口)。TaskScheduler负责实际每个具体Task的物理调度。
DagScheduler(是高级的调度)。DAGScheduler负责将Task拆分成不同Stage的具有依赖关系(包含RDD的依赖关系)的多批任务,然后提交给TaskScheduler进行具体处理。DAG全称 Directed Acyclic Graph,有向无环图。简单的来说,就是一个由顶点和有方向性的边构成的图,从任意一个顶点出发,没有任何一条路径会将其带回到出发的顶点。
在作业调度系统中,调度的基础就在于判断多个作业任务的依赖关系,这些任务之间可能存在多重的依赖关系,也就是说有些任务必须先获得执行,然后另外的相关依赖任务才能执行,但是任务之间显然不应该出现任何直接或间接的循环依赖关系,所以本质上这种关系适合用DAG有向无环图来表示。
基本概念
Task任务:是在集群上运行的基本单位。一个Task负责处理RDD的一个partition。RDD的多个patition会分别由不同的Task去处理。
TaskSet任务集:一组关联的,但是互相之间没有Shuffle依赖关系的任务所组成的任务集
Stage调度阶段:一个任务集所对应的调度阶段
Job作业:一次RDD Action生成的一个或多个Stage所组成的一次计算作业
角色概念
Client:客户端进程,负责提交作业到Master。
Master:Standalone模式中主控节点,负责接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。
Worker:Standalone模式中slave节点上的守护进程,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver和Executor。
Driver: 一个Spark作业运行时包括一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。
Executor:即真正执行作业的地方,一个集群一般包含多个Executor,每个Executor接收Driver的命令Launch Task,一个Executor可以执行一到多个Task。
工作流程
提交并运行一个Job的基本流程,包括以下步骤
划分Stage
当某个操作触发计算,向DAGScheduler提交作业时,DAGScheduler需要从RDD依赖链最末端的RDD出发,遍历整个RDD依赖链,划分Stage任务阶段,并决定各个Stage之间的依赖关系。Stage的划分是以ShuffleDependency为依据的,也就是说当某个RDD的运算需要将数据进行Shuffle时,这个包含了Shuffle依赖关系的RDD将被用来作为输入信息,构建一个新的Stage,由此为依据划分 Stage,可以确保有依赖关系的数据能够按照正确的顺序得到处理和运算。
以GroupByKey操作为例,该操作返回的结果实际上是一个ShuffleRDD,当DAGScheduler遍历到这个ShuffleRDD的时 候,因为其Dependency是一个ShuffleDependency,于是这个ShuffleRDD的父RDD以及 shuffleDependency等对象就被用来构建一个新的Stage,这个Stage的输出结果的分区方式,则由 ShuffleDependency中的Partitioner对象来决定。
可以看到,尽管划分和构建Stage的依据是ShuffleDependency,对应的RDD也就是这里的ShuffleRDD,但是这个Stage所 处理的数据是从这个shuffleRDD的父RDD开始计算的,只是最终的输出结果的位置信息参考了ShuffleRDD返回的 ShuffleDependency里所包含的内容。而shuffleRDD本身的运算操作(其实就是一个获取shuffle结果的过程),是在下一个 Stage里进行的。
生成Job,提交Stage
上一个步骤得到一个或多个有依赖关系的Stage,其中直接触发Job的RDD所关联的Stage作为FinalStage生成一个Job实例,这两者的 关系进一步存储在resultStageToJob映射表中,用于在该Stage全部完成时做一些后续处理,如报告状态,清理Job相关数据等。
具体提交一个Stage时,首先判断该Stage所依赖的父Stage的结果是否可用,如果所有父Stage的结果都可用,则提交该Stage,如果有任何一个父Stage的结果不可用,则迭代尝试提交父Stage。 所有迭代过程中由于所依赖Stage的结果不可用而没有提交成功的Stage都被放到waitingStages列表中等待将来被提交
什么时候waitingStages中的Stage会被重新提交呢,当一个属于中间过程Stage的任务(这种类型的任务所对应的类为 ShuffleMapTask)完成以后,DAGScheduler会检查对应的Stage的所有任务是否都完成了,如果是都完成了,则 DAGScheduler将重新扫描一次waitingStages中的所有Stage,检查他们是否还有任何依赖的Stage没有完成,如果没有就可以 提交该Stage。
此外每当完成一次DAGScheduler的事件循环以后,也会触发一次从等待和失败列表中扫描并提交就绪Stage的调用过程。
任务集的提交
每个Stage的提交,最终是转换成一个TaskSet任务集的提交,DAGScheduler通过TaskScheduler接口提交TaskSet, 这个TaskSet最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个TaskSet的生命周期,对于 DAGScheduler来说提交Stage的工作到此就完成了。而TaskScheduler的具体实现则会在得到计算资源的时候,进一步通过 TaskSetManager调度具体的Task到对应的Executor节点上进行运算
任务作业完成状态的监控
要保证相互依赖的job/stage能够得到顺利的调度执行,DAGScheduler就必然需要监控当前Job / Stage乃至Task的完成情况。这是通过对外(主要是对TaskScheduler)暴露一系列的回调函数来实现的,对于TaskScheduler 来说,这些回调函数主要包括任务的开始结束失败,任务集的失败,DAGScheduler根据这些Task的生命周期信息进一步维护Job和Stage的 状态信息。
此外TaskScheduler还可以通过回调函数通知DAGScheduler具体的Executor的生命状态,如果某一个Executor崩溃了, 或者由于任何原因与Driver失去联系了,则对应的Stage的shuffleMapTask的输出结果也将被标志为不可用,这也将导致对应Stage 状态的变更,进而影响相关Job的状态,再进一步可能触发对应Stage的重新提交来重新计算获取相关的数据。
任务结果的获取
一个具体的任务在Executor中执行完毕以后,其结果需要以某种形式返回给DAGScheduler,根据任务类型的不同,任务的结果的返回方式也不同
对于FinalStage所对应的任务(对应的类为ResultTask)返回给DAGScheduler的是运算结果本身,而对于 ShuffleMapTask,返回给DAGScheduler的是一个MapStatus对象,MapStatus对象管理了 ShuffleMapTask的运算输出结果在BlockManager里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个Stage的任务 的获取输入数据的依据
而根据任务结果的大小的不同,ResultTask返回的结果又分为两类,如果结果足够小,则直接放在DirectTaskResult对象内,如果超过 特定尺寸(默认约10MB)则在Executor端会将DirectTaskResult先序列化,再把序列化的结果作为一个Block存放在 BlockManager里,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中返回给 TaskScheduler,TaskScheduler进而调用TaskResultGetter将IndirectTaskResult中的 BlockID取出并通过BlockManager最终取得对应的DirectTaskResult。当然从DAGScheduler的角度来说,这些过 程对它来说是透明的,它所获得的都是任务的实际运算结果。
CIO之家 www.ciozj.com 公众号:imciow