Spark 源码 | 调度篇

Posted by Aiden on December 9, 2019

本文通过阅读 Spark schedule 相关的代码,主要来分析 spark 是如何将 job 分解为 stage, 并按顺序去调度执行的。

参考的 spark 版本为 2.3.1.

文章主要先依据作业的执行生命流程介绍实现大致思路, 再到结合源码分析不同模块承担的责任与实现细节。

有关具体的细节可以阅读 spark/core/src/main/scala/org/apache/spark/scheduler at v2.3.1 · apache/spark · GitHub


调度流程

我们知道在 spark 中算子分为两类, Transformation算子与 Action 算子。

我们通过 Transformantion 将我们的原始数据集(RDD)进行一系列的转换,过滤,聚合等的操作转变成我们想要的结果,然后通过 Action来将最终结果集放到合适的位置或者只是简单的展示等。

Action 同时具有触发整个Job的作用,当程序执行到 Action 操作时,整个job才正式开始触发运行。

image.png

当一个作业被提交执行后,每当遇到一个 Action, 程序会创建一个 Job 去执行。 如下:

val sc = new SparkContext(new SparkConf())
val rdd = sc.textFile(path)
val flatmapRdd = rdd.flatMap(_.split(" "))
val mapRdd = flatmapRdd.map(str =>(str, 1))
val resultRdd = mapRdd.reduceByKey(_+_)
val result = resultRdd.count()

每当调用一次 Transformation 就会产生一个 RDD, Job 会根据这些 Transformation 的依赖关系去构建 RDD 的依赖关系。

然后Job通过RDD的依赖创建 Stage, ShuffledRDD 是用来划分Stage的边界。如上只产生了两个Stage.

Stage提交是自下而上进行的, 提交Stage时,如果发现这个Stage有其他的依赖, 那么就遍历依赖的Stage并校验有无依赖Stage,直到找到没有依赖的Stage去提交执行。

Stage执行先跟据其内部的RDD partition 的数量创建指定的Task, 并将这些Task使用TaskSetManager维护并放到调度池Pool中。

后面再去资源管理中心SchedulerBackend 去申请资源, 拿到当前剩余 executor 的 core数量后将Pool中的所有TaskSetManager按序取出, 将每个TaskSetManager中的Task放到相应executor的core上去执行,直到资源不足或者是task都已经提交完。

说明 :

  1. 如果 SchedulerBackend 发现有 Task 执行完,会告知 Pool 提交新的 Task 进来。
  2. 如果 TaskSetManager 提交task时, 如果发现所有的task都已经提交完, 则告知上层, 提交其他stage.

调度实现

在实现方面, 整个调度部分主要分为三部分:

  1. DAGScheduler : 主要管理 Job 中 Stage 的分割与执行
  2. TaskScheduler : 主要维护 Task 调度执行信息
  3. SchedulerBackend : 主要负责资源维护, 与作业的执行

image.png

DAGScheduler

DAGScheduler 主要负责 Stage 层面的调度任务, 主要工作是将 Job 解析成 Stage, 并根据依赖关系提交 Stage 去执行。

其内部维护一个 DAGSchedulerEventProcessLoop 的事件处理单线程类, 主要用于响应 DAGScheduler 提交的任务请求。

image.png

作业的提交

Action 算子是提交Job的入口,最终将会调用 DAGScheduler.handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties)

比如 RDD.count() 定义为 def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum, 如上所示, 最终所有的 Action 都会转到 SparkContext.runJob

def runJob[T, U: ClassTag](
    rdd: RDD[T],                             // 这个 RDD 为action前一个 transfoarmation 算子产生的RDD
                                             // 是整个 JOB 拓扑中的最后一个RDD
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

最终在DAGScheduler中通过JobSubmitted事件,将作业提交给 DAGSchedulerEventProcessLoop 去处理

Stage 划分

Job 创建以后首先要解析构建 stage.

private def createResultStage(
    rdd: RDD[_],                               // job 最后一个 RDD
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}

DAGScheduler 构建 Stage 的过程是通过递归调用的, 它会检查 RDD 的依赖性, RDD 的 ShuffleDependency 是切分stage的边界。 如果被切分的stage 有依赖性,则先创建被依赖的父stage。

stage 的结构如下所示。

private[scheduler] abstract class Stage(
    val id: Int,              // stage Id
    val rdd: RDD[_],          // 这个stage 中最后一个rdd
    val numTasks: Int,
    val parents: List[Stage], // 依赖的父 stage
    val firstJobId: Int,
    val callSite: CallSite)
  extends Logging {
    ....
  }

递归构建过程的入口在getOrCreateParentStages 中实现, 这是一个构建父stage的过程, 它首先检查这个 stage 的RDD的dependencies 有没有 ShuffleDependency

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

getShuffleDependencies 深度优先遍历, 找到这个rdd的所有依赖中的上一层 ShuffleDependency.

private[scheduler] def getShuffleDependencies(
    rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  val parents = new HashSet[ShuffleDependency[_, _, _]]
  val visited = new HashSet[RDD[_]]
  val waitingForVisit = new ArrayStack[RDD[_]]
  waitingForVisit.push(rdd)
  while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.pop()
    if (!visited(toVisit)) {
      visited += toVisit
      toVisit.dependencies.foreach {
        case shuffleDep: ShuffleDependency[_, _, _] =>
          parents += shuffleDep
        case dependency =>
          waitingForVisit.push(dependency.rdd)
      }
    }
  }
  parents
}

如果能找到依赖的RDD,则构建对应RDD的 ShuffleMapStage, 这个过程又会回到上面,直到没有依赖的stage.

 def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
  ...
  val parents = getOrCreateParentStages(rdd, jobId)
  val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
  ...
  stage
}

构建中,除了最后一个RDD构建出的是 ResultStage, 剩下都是ShuffleMapStage.

image.png

Job 提交后首先触发submitStage的是ResultStage.

private def submitStage(stage: Stage) {
    ...
      
    val missing = getMissingParentStages(stage).sortBy(_.id) // 获取这个 stage 依赖的 父 stage

    if (missing.isEmpty) {
      // 如果没有依赖的父 stage, 则正式提交
      submitMissingTasks(stage, jobId.get)
    } else {
      // 如果存在父 stage , 则尝试提交父stage, 并把自己添加到等待队列里
      for (parent <- missing) { 
        submitStage(parent)
      }

      waitingStages += stage
    }
}

举例 :

image.png

这样递归就把没有依赖的 stage 通过 submitMissingTasks 正式提交出去。

stage 的提交

stage 提交执行之前, 首先需要将 stage 转换成 task。 因为在 spark 中,task 才是任务计算的单元, task 中封装这个 stage 的执行过程, 每一个 partition 对应一个task. 根据stage 类型的不同, task 可以分成 ShuffleMapTaskResultTask

/** Called when stage's parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {

    // 根据 partition 计算 task 的主机偏好
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
        。。。
        return
    }
    ....
    // 根据 partition 创建 task
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }
      }
    } catch {
      ...
    }

    if (tasks.size > 0) {
      // task 提交给 taskScheduler 去处理
      taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      // 提交依赖 stage 的子 stage 集合
      markStageAsFinished(stage, None)

      submitWaitingChildStages(stage)
    }
  }

TaskScheduler

TaskScheduler 主要负责Task的提交,管理, 以及 executor 的信息维护等。在 Spark 2.3.1 里面, 实现 TaskScheduler 的是 TaskSchedulerImpl.

TaskSchedulerImpl 调度 Task 主要依靠作业调度池 Pool.

val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
...
schedulableBuilder = {
  schedulingMode match {
    case SchedulingMode.FIFO =>
      new FIFOSchedulableBuilder(rootPool)
    case SchedulingMode.FAIR =>
      new FairSchedulableBuilder(rootPool, conf)
    case =>
      throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
      s"$schedulingMode")
  }
}
schedulableBuilder.buildPools()
Pool

Pool 是一个Task调度池, 它决定了需要调度的Task的资源获取顺序。它的调度单元是TaskSetManager.

getSortedTaskSetQueue 方法用于按优先级对TaskSetManager进行排序。只有前面的TaskSetManager完成了资源获取, 后面的作业才有机会进行调度。

override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]

image.png

TaskSetManager 主要用于维护一个 stage 中的所有 Task,并根据提供的机器信息决定提交哪个task上去。

初始化时, 将所有的task放到等待提交task集合里
for (i <- (0 until numTasks).reverse) {
  addPendingTask(i)
}
...
private[spark] def addPendingTask(index: Int) {
  for (loc <- tasks(index).preferredLocations) {
    loc match {
      case e: ExecutorCacheTaskLocation =>
        pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
      case e: HDFSCacheTaskLocation =>
        val exe = sched.getExecutorsAliveOnHost(loc.host)
        exe match {
          case Some(set) =>
            for (e <- set) {
              pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
            }
            logInfo(s"Pending task $index has a cached location at ${e.host} " +
              ", where there are executors " + set.mkString(","))
          case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
              ", but there are no executors alive there.")
        }
      case _ =>
    }
    pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
    for (rack <- sched.getRackForHost(loc.host)) {
      pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
    }
  }

  if (tasks(index).preferredLocations == Nil) {
    pendingTasksWithNoPrefs += index
  }

  allPendingTasks += index  // No point scanning this whole list to find the old task there
}

Pool 的构建依赖于 SchedulableBuild.

  • FIFOSchedulableBuilder

image.png

FIFOSchedulableBuilder 构建的调度池调度策略较为单一, 不同TaskSetManger间使用priority作为优先级顺序,如果priority 相同,则使用stageId 作为优先级顺序。

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    res < 0
  }
}
  • FairSchedulableBuilder :

image.png

FairSchedulerBuilder 构造的调度池嵌套多个子调度池, 每个调度池的具有自己的权重信息,高优先级的调度池内的TaskSetManager 要优先于低优先级的 TaskSetManager. 同一个调度池内的TaskSetManager 使用 prioritystageId 比较。

当 task 通过 DAGScheduler 提交过来时,TaskScheduler 首先会把他封装成 TaskSetManager, 放到调度池Pool中。然后去 SchedulerBackend 申请资源。

val manager = createTaskSetManager(taskSet, maxTaskFailures)
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
backend.reviveOffers()

SchedulerBackend 如果有资源, 会把资源转交给 TaskScheduler 让它去安排这些资源上要执行哪些作业def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]]

每个 WorkerOffer 中记录所在的executor以及这上面有多少空闲的 core.

接下来 TaskScheduler 首先对这些 WorkOffer 进行过滤, 排除掉一些黑名单上的 executor, 然后对这些 executor 混洗,使得作业安排更均匀一些。

然后从调度池中按序取出所有的 TaskSetManager.

接下来需要尝试按序为每个 TaskSetManager 中的所有 Task 分配资源。最终直到没有空闲资源或者所有的Task分配完成。

说明

  1. 资源分配是以 TaskSetManager 为序的, 当前面的 TaskSetManager 分配完所有的Task,后面的TaskSetManager才有机会分配。
  2. Task 的分配是以作业本地性的优先级依次来进行分配的, 尽量将数据与计算本地化
  3. 尽量将Task均匀的分散到所有的executor上。
/**
 * Called by cluster manager to offer resources on slaves. We respond by asking our active task
 * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
 * that tasks are balanced across the cluster.
 *
 * 这个方法由调度后端调用,调度后端会将可用的 executor 资源告诉TaskSchedulerImpl,
 * TaskSchedulerImpl 根据 TaskSet 优先级(调度池),黑名单,本地性等因素给出要实际运行的任务。
 * 我们使用 round-robin 的方式将任务分配到各个executor上,以使得计算资源的 使用更均衡。
 */
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  // Mark each slave as alive and remember its hostname
  // Also track if new executor is added
  //  标记是否有新的可用executor加入
  var newExecAvail = false

  // 这个循环主要目的是两个:
  // 1. 更新一些簿记量,如物理节点和executor的相互映射关系,机架和host的映射关系,host和executor上运行的任务信息等等
  // 2. 检查是否有新的可用executor加入
  for (o <- offers) {
    if (!hostToExecutors.contains(o.host)) {
      hostToExecutors(o.host) = new HashSet[String]()
    }
    if (!executorIdToRunningTaskIds.contains(o.executorId)) {
      hostToExecutors(o.host) += o.executorId
      executorAdded(o.executorId, o.host)
      executorIdToHost(o.executorId) = o.host
      executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
      newExecAvail = true
    }
    for (rack <- getRackForHost(o.host)) {
      hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
    }
  }

  // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
  // this here to avoid a separate thread and added synchronization overhead, and also because
  // updating the blacklist is only relevant when task offers are being made.
  // 触发黑名单的超时检查,被加入黑明单的节点或executor是由一定超时时间的,
  // 在超时时间内不能像他们提交任务,而过了超时时间,这些资源将被重新投入使用.
  blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

  // 根据最新的黑名单过滤掉在黑名单中的计算资源,包括host和executor
  val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
    offers.filter { offer =>
      !blacklistTracker.isNodeBlacklisted(offer.host) &&
        !blacklistTracker.isExecutorBlacklisted(offer.executorId)
    }
  }.getOrElse(offers)

  // 对资源进行混洗,使得分配更加均匀,使用scala库的Random进行混洗
  val shuffledOffers = shuffleOffers(filteredOffers)
  // Build a list of tasks to assign to each worker.
  // 每个executor能分配多少个任务,cores / CPUS_PER_TASK
  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
  // 每个executor提供的cpu核数
  val availableCpus = shuffledOffers.map(o => o.cores).toArray
  // 通过调度池对所有的任务集按优先级进行排序,获取排序后的任务集
  val sortedTaskSets = rootPool.getSortedTaskSetQueue // 第二部

  // 如果有新的executor加入,需要通知每个TaskSetManager
  for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
  }

  // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
  // of locality levels so that it gets a chance to launch local tasks on all of them.
  // 按照我们的调度顺序获取每个TaskSet,然后按局部性级别的升序将其提供给每个节点,以便它有机会在所有节点上启动本地任务。
  // NOTE: 分配顺序 : PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY

  for (taskSet <- sortedTaskSets) {
    var launchedAnyTask = false
    var launchedTaskAtCurrentMaxLocality = false
    // 本地性从低到高的顺序
    for (currentMaxLocality <- taskSet.myLocalityLevels) {
      // 每个本地性级别会进行多轮分配,
      // 每一轮依次轮询每个executor,每个executor分配一个任务,
      // 这样一轮下来每个executor都会分配到一个任务,显然大多数情况下,executor的资源是不会被占满的
      // 没关系,我们会接着进行第二轮分配,知道没有资源或者在当前的本地性级别下任务被分配完了,就跳出循环
      do {
        launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
        launchedAnyTask |= launchedTaskAtCurrentMaxLocality
      } while (launchedTaskAtCurrentMaxLocality)
    }
    if (!launchedAnyTask) {
      taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    }
  }

  if (tasks.size > 0) {
    hasLaunchedTask = true
  }
  return tasks
}

当所有的 task 分配好资源后, SchedulerBackend 将拿到分配好资源的作业信息 TaskDescription 去提交执行。 当有Task执行完成后会告知 TaskScheduler 调度新的Task进来。并且需要检查是否要调度新的 stage 进来。

作业调度时序图

image.png


参考文档 :