博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Executor Driver资源调度汇总
阅读量:5779 次
发布时间:2019-06-18

本文共 6759 字,大约阅读时间需要 22 分钟。

  一、简介

  于Worker Actor于,每次LaunchExecutor这将创建一个CoarseGrainedExecutorBackend流程。Executor和CoarseGrainedExecutorBackend是1对1的关系。也就是说集群里启动多少Executor实例就有多少CoarseGrainedExecutorBackend进程

  那么究竟是怎样分配Executor的呢?怎么控制调节Executor的个数呢?

 二、Driver和Executor资源调度

   以下主要介绍一下Spark Executor分配策略:

   我们仅看。当Application提交注冊到Master后,Master会返回RegisteredApplication,之后便会调用schedule()这种方法,来分配Driver的资源。和启动Executor的资源。

schedule()方法是来调度当前可用资源的调度方法,它管理还在排队等待的Apps资源的分配。这种方法是每次在集群资源发生变动的时候都会调用,依据当前集群最新的资源来进行Apps的资源分配。

Driver资源调度:

  随机的将Driver分配到空暇的Worker上去,具体流程请看我写的凝视 :)

// First schedule drivers, they take strict precedence over applications    val shuffledWorkers = Random.shuffle(workers) // 把当前workers这个HashSet的顺序随机打乱    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { //遍历活着的workers      for (driver <- waitingDrivers) { //在等待队列中的Driver们会进行资源分配        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { //当前的worker内存和cpu均大于当前driver请求的mem和cpu。则启动          launchDriver(worker, driver) //启动Driver 内部实现是发送启动Driver命令给指定Worker。Worker来启动Driver。          waitingDrivers -= driver //把启动过的Driver从队列移除        }      }    }

Executor资源调度:

 Spark默认提供了一种在各个节点进行round-robin的调度,用户能够自己设置这个flag
val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
在介绍之前我们先介绍一个概念,
可用的Worker:什么是可用,可用就是
资源空暇足够且满足一定的
规则来启动当前App的Executor。

Spark定义了一个canUse方法:这种方法接受一个ApplicationInfo的描写叙述信息和当前Worker的描写叙述信息。
1、
当前worker的空暇内存
该app在每一个slave要占用的内存 (executor.memory默认512M)
 
2、当前app从未在此worker启动过App
总结: 从这点看出。要满足:该Worker的当前可用最小内存要比配置的executor内存大,而且对于同一个App仅仅能在一个Worker里启动一个Exeutor。假设要启动第二个Executor。那么请到其他Worker里。

这种才算是对App可用的Worker。

/**   * Can an app use the given worker?

True if the worker has enough memory and we haven't already * launched an executor for the app on it (right now the standalone backend doesn't like having * two executors on the same worker). */ def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) }

SpreadOut分配策略:

SpreadOut分配策略是一种以round-robin方式遍历集群全部可用Worker。分配Worker资源,来启动创建Executor的策略。优点是尽可能的将cores分配到各个节点,最大化负载均衡和高并行。

以下看看,默认的spreadOutApps模式启动App的过程: 
 1、等待分配资源的apps队列默认是FIFO的。
 2、app.coresLeft表示的是该app还有cpu资源没申请到:  
app.coresLeft  = 当前app申请的maxcpus - granted的cpus
 3、遍历未分配全然的apps,继续给它们分配资源,
 4、usableWorkers =  从当前ALIVE的Workers中过滤找出上文描写叙述的可用Worker。然后依据cpus的资源空暇,从大到小给Workers排序。
 5、当toAssign(即将要分配的的core数>0,就找到能够的Worker持续分配)
 6、当可用Worker的free cores 大于 眼下该Worker已经分配的core时,再给它分配1个core,这样分配是非常平均的方法。
 7、round-robin轮询可用的Worker循环
 8、toAssign=0时结束循环。開始依据分配策略去真正的启动Executor。

举例: 1个APP申请了6个core, 如今有2个Worker可用。
      那么:
toAssign = 6,assigned = 2 
 那么就会在assigned(1)和assigned(0)中轮询平均分配cores,以+1 core的方式,终于每一个Worker分到3个core。即每一个Worker的启动一个Executor。每一个Executor获得3个cores。
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app    // in the queue, then the second app, etc.    if (spreadOutApps) {      // Try to spread out each app among all the nodes, until it has all its cores      for (app <- waitingApps if app.coresLeft > 0) { //对还未被全然分配资源的apps处理        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)          .filter(canUse(app, _)).sortBy(_.coresFree).reverse //依据core Free对可用Worker进行降序排序。        val numUsable = usableWorkers.length //可用worker的个数 eg:可用5个worker        val assigned = new Array[Int](numUsable) //候选Worker,每一个Worker一个下标,是一个数组,初始化默认都是0        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//还要分配的cores = 集群中可用Worker的可用cores总和(10)。 当前未分配core(5)中找最小的        var pos = 0        while (toAssign > 0) {           if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在全部可用Worker里推断当前worker空暇cpu是否大于当前数组已经分配core值            toAssign -= 1            assigned(pos) += 1 //当前下标pos的Worker分配1个core +1          }          pos = (pos + 1) % numUsable //round-robin轮询寻找有资源的Worker        }        // Now that we've decided how many cores to give on each node, let's actually give them        for (pos <- 0 until numUsable) {          if (assigned(pos) > 0) { //假设assigned数组中的值>0,将启动一个executor在。指定下标的机器上。            val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app里的Executor信息            launchExecutor(usableWorkers(pos), exec)  //通知可用Worker去启动Executor            app.state = ApplicationState.RUNNING          }        }      }    } else {

非SpreadOut分配策略:

非SpreadOut策略。该策略:会尽可能的依据每一个Worker的剩余资源来启动Executor,这样启动的Executor可能仅仅在集群的一小部分机器的Worker上。这样做对node较少的集群还可以,集群规模大了。Executor的并行度和机器负载均衡就不可以保证了。

当用户设定了參数
spark.deploy.spreadOut
false时,触发此游戏分支
偷笑,跑个题,有些困了。

1、遍历可用Workers
2、且遍历Apps
3、比較当前Worker的可用core和app还须要分配的core。取最小值当做还须要分配的core
4、假设coreToUse大于0。则直接拿可用的core来启动Executor。。

奉献当前Worker所有资源。(Ps:挨个榨干每一个Worker的剩余资源。。。。

举例: App申请12个core,3个Worker。Worker1剩余1个core, Worke2r剩7个core, Worker3剩余4个core.
这样会启动3个Executor。Executor1 占用1个core, Executor2占用7个core, Executor3占用4个core.
总结:这样是尽可能的满足App,让其尽快运行,而忽略了其并行效率和负载均衡。
} else {      // Pack each app into as few nodes as possible until we've assigned all its cores      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {        for (app <- waitingApps if app.coresLeft > 0) {          if (canUse(app, worker)) { //直接问当前worker是有空暇的core            val coresToUse = math.min(worker.coresFree, app.coresLeft) //有则取。无论多少            if (coresToUse > 0) { //有              val exec = app.addExecutor(worker, coresToUse) //直接启动              launchExecutor(worker, exec)              app.state = ApplicationState.RUNNING            }          }        }      }    }  }

三、总结:

 1、 在Worker Actor中。每次LaunchExecutor会创建一个CoarseGrainedExecutorBackend进程,一个Executor相应一个CoarseGrainedExecutorBackend

 2、针对同一个App。每一个Worker里仅仅能有一个针对该App的Executor存在。切记。

假设想让整个App的Executor变多,设置SPARK_WORKER_INSTANCES。让Worker变多。

 3、Executor的资源分配有2种策略:

3.1、SpreadOut :一种以round-robin方式遍历集群全部可用Worker。分配Worker资源。来启动创建Executor的策略,优点是尽可能的将cores分配到各个节点。最大化负载均衡和高并行。

3.2、非SpreadOut:会尽可能的依据每一个Worker的剩余资源来启动Executor,这样启动的Executor可能仅仅在集群的一小部分机器的Worker上。这样做对node较少的集群还可以,集群规模大了。Executor的并行度和机器负载均衡就不可以保证了。

行文仓促,如有不正之处,请指出,欢迎讨论 :)

补充:

1、关于:   一个App一个Worker为什么仅仅有同意有针对该App的一个Executor 究竟这样设计为何? 的讨论:

:Spark是线程级并行模型。为什么须要一个worker为一个app启动多个executor呢?

:一个worker相应一个executorbackend是从mesos那一套迁移过来的,mesos下也是一个slave一个executorbackend。我理解这里是能够实现起多个,但起多个貌似没什么优点,并且添加了复杂度。

 做了一个patch能够启动多个,可是还没有被merge。 从Yarn的角度考虑的话,一个Worker能够相应多个executorbackend,正如一个nodemanager相应多个container。  

:回复: 假设一个executor太大且装的对象太多。会导致GC非常慢,多几个Executor会降低full gc慢的问题。 see this post (今天 11:25)

:回复:哦。这个考虑是有道理的。

一个workaround是单台机器部署多个worker。worker相对来说比較便宜。

 

:回复:看来都还在变化其中,standalone 和 YARN 还是有非常多不同,我们暂不下结论 (今天 11:35)

:问题開始变得复杂了,是提高线程并行度还是提高进程并行度?我想 Spark 还是优先选择前者,这样 task 好管理。并且 broadcast,cache 的效率高些。后者有一些道理。但參数配置会变得更复杂,各有利弊吧 (今天 11:40)

未完待续。。

传送门:@JerrLead  

——EOF——

原创文章。转载请注明来自:

你可能感兴趣的文章
自己写spring boot starter
查看>>
花钱删不完负面消息
查看>>
JBPM之JPdl小叙
查看>>
(step6.1.5)hdu 1233(还是畅通工程——最小生成树)
查看>>
Membership三步曲之进阶篇 - 深入剖析Provider Model
查看>>
huffman编码——原理与实现
查看>>
Linux移植随笔:终于解决Tslib的问题了【转】
查看>>
MyBitis(iBitis)系列随笔之四:多表(多对一查询操作)
查看>>
【leetcode】Longest Common Prefix
查看>>
前端优化及相关要点总结
查看>>
Vue 列表渲染
查看>>
struts2中form提交到action中的中文参数乱码问题解决办法(包括取中文路径)
查看>>
25 个精美的手机网站模板
查看>>
C#反射实例应用--------获取程序集信息和通过类名创建类实例
查看>>
VC中实现文字竖排的简单方法
查看>>
会话标识未更新
查看>>
【设计模式】数据访问对象模式
查看>>
Tomcat8 配置Oracle11g数据源
查看>>
【PHP面向对象(OOP)编程入门教程】8.构造方法__construct()与析构方法__destruct()
查看>>
ThinkPHP常用配置路径
查看>>