Spark面试问题梳理


问题一:Spark中的RDD是什么,有哪些特性?

1.RDD是什么?

RDD(Resilient Distributed Dataset)叫做分布式数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可以并行计算的集合

  • Dataset:就是一个集合,用于存放数据的
  • Destributed:分布式,可以并行在集群计算
  • Resilient:表示弹性的,弹性表示
    • RDD中的数据可以存储在内存或者磁盘中;
    • RDD中的分区是可以改变的;

2. 五大特性:

  1. A list of partitions:一个分区列表,RDD中的数据都存储在一个分区列表中
  2. A function for computing each split:作用在每一个分区中的函数
  3. A list of dependencies on other RDDs:一个RDD依赖于其他多个RDD,这个点很重要,RDD的容错机制就是依据这个特性而来的
  4. Optionally,a Partitioner for key-value RDDs(eg:to say that the RDD is hash-partitioned):可选的,针对于kv类型的RDD才有这个特性,作用是决定了数据的来源以及数据处理后的去向
  5. 可选项,数据本地性,数据位置最优

问题二:.概述一下spark中的常用算子区别(map,mapPartitions,foreach,foreachPatition)

常用算子:

  • map:用于遍历RDD,将函数应用于每一个元素,返回新的RDD(transformation算子)
  • foreach:用于遍历RDD,将函数应用于每一个元素,无返回值(action算子)
  • mapPatitions:用于遍历操作RDD中的每一个分区,返回生成一个新的RDD(transformation算子)
  • foreachPatition:用于遍历操作RDD中的每一个分区,无返回值(action算子)

总结:一般使用mapPatitionsforeachPatition算子比map和foreach更加高效,推荐使用

问题三:.谈谈spark中的宽窄依赖

答:RDD和它的父RDD的关系有两种类型:窄依赖宽依赖

  • 宽依赖:指的是多个子RDD的Partition会依赖同一个父RDD的Partition,关系是一对多,父RDD的一个分区的数据去到子RDD的不同分区里面,会有shuffle的产生
  • 窄依赖:指的是每一个父RDD的Partition最多被子RDD的一个partition使用,是一对一的,也就是父RDD的一个分区去到了子RDD的一个分区中,这个过程没有shuffle产生

区分的标准就是看父RDD的一个分区的数据的流向,要是流向一个partition的话就是窄依赖,否则就是宽依赖,如图所示:

img

问题四:spark中如何划分stage:

  • 概念

​ Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图DAG会提交给DAGSchedulerDAGScheduler会把DAG划分相互依赖的多个stage,划分依据就是宽窄依赖,遇到宽依赖就划分stage,每个stage包含一个或多个task,然后将这些task以taskSet的形式提交给TaskScheduler运行,stage是由一组并行的task组成。

​ Spark程序中可以因为不同的action触发众多的job,一个程序中可以有很多的job,每一个job是由一个或者多个stage构成的,后面的stage依赖于前面的stage,也就是说只有前面依赖的stage计算完毕后,后面的stage才会运行;

​ stage 的划分标准就是宽依赖:何时产生宽依赖就会产生一个新的stage,例如reduceByKey,groupByKey,join的算子,会导致宽依赖的产生;

​ 切割规则:从后往前,遇到宽依赖就切割stage;

  • 图解

img

  • 计算格式:pipeline管道计算模式,piepeline只是一种计算思想,一种模式。

​ spark的pipeline管道计算模式相当于执行了一个高阶函数,也就是说来一条数据然后计算一条数据,会把所有的逻辑走完,然后落地,而MapReduce是1+1=2,2+1=3这样的计算模式,也就是计算完落地,然后再计算,然后再落地到磁盘或者内存,最后数据是落在计算节点上,按reduce的hash分区落地。管道计算模式完全基于内存计算,所以比MapReduce快的原因。

管道中的RDD何时落地:shuffle write的时候,对RDD进行持久化的时候。

​ stage的task的并行度是由stage的最后一个RDD的分区数来决定的,一般来说,一个partition对应一个task,但最后reduce的时候可以手动改变reduce的个数,也就是改变最后一个RDD的分区数,也就改变了并行度。例如:reduceByKey(+,3)

  • 优化:提高stage的并行度:reduceByKey(+,patition的个数) ,join(+,patition的个数)

问题五:DAGScheduler分析:

答:

  • 概述DAGScheduler是一个面向stage 的调度器;

  • 主要入参

    • dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)
    • rdd: final RDD;
    • cleanedFunc: 计算每个分区的函数;
    • resultHander: 结果侦听器;
  • 主要功能

    1. 接受用户提交的job;

    2. 将job根据类型划分为不同的stage,记录那些RDD,stage被物化,并在每一个stage内产生一系列的task,并封装成taskset;

    3. 决定每个task的最佳位置,任务在数据所在节点上运行,并结合当前的缓存情况,将taskSet提交给TaskScheduler

    4. 重新提交shuffle输出丢失的stage给taskScheduler;

注:一个stage内部的错误不是由shuffle输出丢失造成的,DAGScheduler是不管的,由TaskScheduler负责尝试重新提交task执行。

问题六:Job的生成:

​ 一旦driver程序中出现action,就会生成一个job,比如count等,向DAGScheduler提交job,如果driver程序后面还有action,那么其他action也会对应生成相应的job,所以,driver端有多少action就会提交多少job,这可能就是为什么spark将driver程序称为application而不是job 的原因。

​ 每一个job可能会包含一个或者多个stage,最后一个stage生成result,在提交job 的过程中,DAGScheduler会首先从后往前划分stage,划分的标准就是宽依赖,一旦遇到宽依赖就划分,然后先提交没有父阶段的stage们,并在提交过程中,计算该stage的task数目以及类型,并提交具体的task,在这些无父阶段的stage提交完之后,依赖该stage 的stage才会提交。

问题七:有向无环图:

DAG,有向无环图,简单的来说,就是一个由顶点和有方向性的边构成的图中,从任意一个顶点出发,没有任意一条路径会将其带回到出发点的顶点位置,为每个spark job计算具有依赖关系的多个stage任务阶段,通常根据shuffle来划分stage,如reduceByKey,groupByKey等涉及到shuffle的transformation就会产生新的stage ,然后将每个stage划分为具体的一组任务,以TaskSets的形式提交给底层的任务调度模块来执行,其中不同stage之前的RDD为宽依赖关系,TaskScheduler任务调度模块负责具体启动任务,监控和汇报任务运行情况。

问题八:RDD是什么以及它的分类:

img

问题九:RDD的操作

img

img

img

img

img

img

img

img

img

img

问题十: RDD缓存

​ Spark可以使用 persist 和 cache 方法将任意 RDD 缓存到内存、磁盘文件系统中。缓存是容错的,如果一个 RDD 分片丢失,可以通过构建它的 transformation自动重构。被缓存的 RDD 被使用的时,存取速度会被大大加速。一般的executor内存60%做 cache, 剩下的40%做task。

​ Spark中,RDD类可以使用cache() 和 persist() 方法来缓存。cache()是persist()的特例,将该RDD缓存到内存中。而persist可以指定一个StorageLevel。StorageLevel的列表可以在StorageLevel 伴生单例对象中找到。

​ Spark的不同StorageLevel ,目的满足内存使用和CPU效率权衡上的不同需求。我们建议通过以下的步骤来进行选择:

  • 如果你的RDDs可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDDs的操作尽可能的快。

  • 如果不行,试着使用MEMORY_ONLY_SER并且选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。

  • 尽可能不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快。

  • 如果你想有快速故障恢复能力,使用复制存储级别(例如:用Spark来响应web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续的运行任务,而不需要等待丢失的分区被重新计算。

  • 如果你想要定义你自己的存储级别(比如复制因子为3而不是2),可以使用StorageLevel 单例对象的apply()方法。

  • 在不会使用cached RDD的时候,及时使用unpersist方法来释放它。

问题十一:RDD共享变量:

​ 在应用开发中,一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所有变量的独立拷贝。这些变量会被拷贝到每一台机器。通常看来,在任务之间中,读写共享变量显然不够高效。然而,Spark还是为两种常见的使用模式,提供了两种有限的共享变量:广播变量和累加器。

(1). 广播变量(Broadcast Variables)

– 广播变量缓存到各个节点的内存中,而不是每个 Task

– 广播变量被创建后,能在集群中运行的任何函数调用

– 广播变量是只读的,不能在被广播后修改

– 对于大数据集的广播, Spark 尝试使用高效的广播算法来降低通信成本

val broadcastVar = sc.broadcast(Array(1, 2, 3))方法参数中是要广播的变量
(2). 累加器

​ 累加器只支持加法操作,可以高效地并行,用于实现计数器和变量求和。Spark 原生支持数值类型和标准可变集合的计数器,但用户可以添加新的类型。只有驱动程序才能获取累加器的值

问题十二:spark-submit的时候如何引入外部jar包:

在通过spark-submit提交任务时,可以通过添加配置参数来指定

–driver-class-path 外部jar包
–jars 外部jar包

问题十三:spark如何防止内存溢出:

  • driver端的内存溢出

    • 可以增大driver的内存参数spark.driver.memory (default 1g)
      这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。
    • map过程产生大量对象导致内存溢出
      这种溢出的原因是在单个map中产生了大量的对象导致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在rdd中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
      面对这种问题注意,不能使用rdd.coalesce方法,这个方法只能减少分区,不能增加分区, 不会有shuffle的过程。
  • 数据不平衡导致内存溢出

     数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。这里就不再累赘了。
  • shuffle后内存溢出

      shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。
  • standalone模式下资源分配不均匀导致内存溢出

      在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 这两个参数,但是没有配置–executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。

    ​ 这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()
    rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。

问题十四:spark中cache和persist的区别:

cache:缓存数据,默认是缓存在内存中,其本质还是调用persist
persist: 缓存数据,有丰富的数据缓存策略。数据可以保存在内存也可以保存在磁盘中,使用的时候指定对应的缓存级别就可以了。

问题十五:spark中的数据倾斜的现象,原因,后果:

(1)、数据倾斜的现象
多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。
(2)、数据倾斜的原因

  • 数据问题
    1、key本身分布不均衡(包括大量的key为空)
    2、key的设置不合理

  • spark使用问题
    1、shuffle时的并发度不够
    2、计算方式有误

(3)、数据倾斜的后果

  1. spark中的stage的执行时间受限于最后那个执行完成的task,因此运行缓慢的任务会拖垮整个程序的运行速度(分布式程序运行的速度是由最慢的那个task决定的)

  2. 过多的数据在同一个task中运行,将会把executor撑爆。

问题十六:spark数据倾斜的处理:

发现数据倾斜的时候,不要急于提高executor的资源,修改参数或是修改程序,首先要检查数据本身,是否存在异常数据。

  1. 数据问题造成的数据倾斜
    找出异常的key
    如果任务长时间卡在最后最后1个(几个)任务,首先要对key进行抽样分析,判断是哪些key造成的。 选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个。
    比如:

    df.select(“key”).sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k._2,k._1)).sortByKey(false).take(10)

如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。

经过分析,倾斜的数据主要有以下三种情况:

1、null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。
2、无效数据,大量重复的测试数据或是对结果影响不大的有效数据。
3、有效数据,业务导致的正常数据分布。

解决办法

第1,2种情况,直接对数据进行过滤即可(因为该数据对当前业务不会产生影响)。
第3种情况则需要进行一些特殊操作,常见的有以下几种做法
(1) 隔离执行,将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作
(2) 对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。
(3) 使用reduceByKey 代替 groupByKey(reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义.)
(4) 使用map join

案例

如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作流程如下:
(1) 将原始的 key 转化为 key + 随机值(例如Random.nextInt)
(2) 对数据进行 reduceByKey(func)
(3) 将 key + 随机值 转成 key
(4) 再对数据进行 reduceByKey(func)

案例操作流程分析

假设说有倾斜的Key,我们给所有的Key加上一个随机数,然后进行reduceByKey操作;此时同一个Key会有不同的随机数前缀,在进行reduceByKey操作的时候原来的一个非常大的倾斜的Key就分而治之变成若干个更小的Key,不过此时结果和原来不一样,怎么破?进行map操作,目的是把随机数前缀去掉,然后再次进行reduceByKey操作。(当然,如果你很无聊,可以再次做随机数前缀),这样我们就可以把原本倾斜的Key通过分而治之方案分散开来,最后又进行了全局聚合
注意1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。
注意2: 单独处理异常数据时,可以配合使用Map Join解决。

2.spark使用不当造成的数据倾斜

  • 提高shuffle并行度
    dataFrame和sparkSql可以设置spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200。
    rdd操作可以设置spark.default.parallelism控制并发度,默认参数由不同的Cluster Manager控制。

    • 局限性: 只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些key的大 小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。
  • 使用map join 代替reduce join

      在小表不是特别大(取决于你的executor大小)的情况下使用,可以使程序避免shuffle的过程,自然也就没有数据倾斜的困扰了.(详细见http://blog.csdn.net/lsshlsw/article/details/50834858、http://blog.csdn.net/lsshlsw/article/details/48694893)
    • 局限性: 因为是先将小数据发送到每个executor上,所以数据量不能太大。

      问题十七:spark中map-side-join关联优化:

​ 将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。

如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。

何时使用:在海量数据中匹配少量特定数据

原理:reduce-side-join 的缺陷在于会将key相同的数据发送到同一个partition中进行运算,大数据集的传输需要长时间的IO,同时任务并发度收到限制,还可能造成数据倾斜。

reduce-side-join 运行图如下

img

map-side-join 运行图如下:

img

将少量的数据转化为Map进行广播,广播会将此 Map 发送到每个节点中,如果不进行广播,每个task执行时都会去获取该Map数据,造成了性能浪费。对大数据进行遍历,使用mapPartition而不是map,因为mapPartition是在每个partition中进行操作,因此可以减少遍历时新建broadCastMap.value对象的空间消耗,同时匹配不到的数据也不会返回。

问题十八:kafka整合sparkStreaming问题:

(1)、如何实现sparkStreaming读取kafka中的数据
可以这样说:在kafka0.10版本之前有二种方式与sparkStreaming整合,一种是基于receiver,一种是direct,然后分别阐述这2种方式分别是什么

  • receiver:是采用了kafka高级api,利用receiver接收器来接受kafka topic中的数据,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据,kafka中topic的偏移量是保存在zk中的。
    基本使用:还有几个需要注意的点:

    ​ 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度.
    ​ 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
    在默认配置下,这种方式可能会因为底层的失败而丢失数据. 因为receiver一直在接收数据,在其已经通知zookeeper数据接收完成但是还没有处理的时候,executor突然挂掉(或是driver挂掉通知executor关闭),缓存在其中的数据就会丢失. 如果希望做到高可靠, 让数据零丢失,如果我们启用了Write Ahead Logs(spark.streaming.receiver.writeAheadLog.enable=true)该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中. 所以, 即使底层节点出现了失败, 也可以使用预写日志中的数据进行恢复. 复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)

  • direct: 在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。(设置spark.streaming.kafka.maxRatePerPartition=10000。限制每秒钟从topic的每个partition最多消费的消息条数)

(2) 对比这2中方式的优缺点:

  • 采用receiver方式:这种方式可以保证数据不丢失,但是无法保证数据只被处理一次,WAL实现的是At-least-once语义(至少被处理一次),如果在写入到外部存储的数据还没有将offset更新到zookeeper就挂掉,这些数据将会被反复消费. 同时,降低了程序的吞吐量。

  • 采用direct方式: 相比Receiver模式而言能够确保机制更加健壮. 区别于使用Receiver来被动接收数据, Direct模式会周期性地主动查询Kafka, 来获得每个topic+partition的最新的offset, 从而定义每个batch的offset的范围. 当处理数据的job启动时, 就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

    **优点**: 
    **1、简化并行读取** 
    如果要读取多个partition, 不需要创建多个输入DStream然后对它们进行union操作. Spark会创建跟Kafka partition一样多的RDD partition, 并且会并行从Kafka中读取数据. 所以在Kafka partition和RDD partition之间, 有一个一对一的映射关系.
    **2、高性能** 
    如果要保证零数据丢失, 在基于receiver的方式中, 需要开启WAL机制. 这种方式其实效率低下, 因为数据实际上被复制了两份, Kafka自己本身就有高可靠的机制, 会对数据复制一份, 而这里又会复制一份到WAL中. 而基于direct的方式, 不依赖Receiver, 不需要开启WAL机制, 只要Kafka中作了数据的复制, 那么就可以通过Kafka的副本进行恢复.
    **3、一次且仅一次的事务机制** 
    基于receiver的方式, 是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的. 这是消费Kafka数据的传统方式. 这种方式配合着WAL机制可以保证数据零丢失的高可靠性, 但是却无法保证数据被处理一次且仅一次, 可能会处理两次. 因为Spark和ZooKeeper之间可能是不同步的. 基于direct的方式, 使用kafka的简单api, Spark Streaming自己就负责追踪消费的offset, 并保存在checkpoint中. Spark自己一定是同步的, 因此可以保证数据是消费一次且仅消费一次。不过需要自己完成将offset写入zk的过程,在官方文档中都有相应介绍. 
    -*简单代码实例: 
    
    messages.foreachRDD(rdd=>{ 
    
    val message = rdd.map(_._2)//对数据进行一些操作 
    
    message.map(method)//更新zk上的offset (自己实现) 
    
    updateZKOffsets(rdd) 
    
    }) 

    sparkStreaming程序自己消费完成后,自己主动去更新zk上面的偏移量。也可以将zk中的偏移量保存在mysql或者redis数据库中,下次重启的时候,直接读取mysql或者redis中的偏移量,获取到上次消费的偏移量,接着读取数据。

问题十九:利用scala语言进行排序

1.冒泡

img

2.快读排序

img

img

问题二十:spark master在使用zookeeper进行HA时,有哪些元数据保存在zookeeper?

​ Spark通过这个参数spark.deploy.zookeeper.dir指定master元数据在zookeeper中保存的位置,包括worker,master,application,executors.standby节点要从zk中获得元数据信息,恢复集群运行状态,才能对外继续提供服务,作业提交资源申请等,在恢复前是不能接受请求的,另外,master切换需要注意两点:

​ 1. 在master切换的过程中,所有的已经在运行的程序皆正常运行,因为spark application在运行前就已经通过cluster manager获得了计算资源,所以在运行时job本身的调度和处理master是没有任何关系的;

​ 2. 在master的切换过程中唯一的影响是不能提交新的job,一方面不能提交新的应用程序给集群,因为只有Active master才能接受新的程序的提交请求,另外一方面,已经运行的程序也不能action操作触发新的job提交请求。

问题二十一:spark master HA主从切换过程不会影响集群已有的作业运行,为什么?

答:因为程序在运行之前,已经向集群申请过资源,这些资源已经提交给driver了,也就是说已经分配好资源了,这是粗粒度分配,一次性分配好资源后不需要再关心资源分配,在运行时让driver和executor自动交互,弊端是如果资源分配太多,任务运行完不会很快释放,造成资源浪费,这里不适用细粒度分配的原因是因为任务提交太慢。

问题二十二:什么是粗粒度,什么是细粒度,各自的优缺点是什么?

1.粗粒度:启动时就分配好资源,程序启动,后续具体使用就使用分配好的资源,不需要再分配资源。好处:作业特别多时,资源复用率较高,使用粗粒度。缺点:容易资源浪费,如果一个job有1000个task,完成了999个,还有一个没完成,那么使用粗粒度。如果有999个资源闲置在那里,会造成资源大量浪费。

2.细粒度:用资源的时候分配,用完了就立即回收资源,启动会麻烦一点,启动一次分配一次,会比较麻烦。

问题二十三:driver的功能是什么

1.一个spark作业运行时包括一个driver进程,也就是作业的主进程,具有main函数,并且有sparkContext的实例,是程序的入口;

2.功能:负责向集群申请资源,向master注册信息,负责了作业的调度,负责了作业的解析,生成stage并调度task到executor上,包括DAGScheduler,TaskScheduler。

问题二十四:spark的有几种部署模式,每种模式特点?

1)本地模式

Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类

· local:只启动一个executor

· local[k]:启动k个executor

· local:启动跟cpu数目相同的 executor

2)standalone模式

分布式部署集群, 自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其他模式的基础,

3)Spark on yarn模式

分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster和client运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端

4)Spark On Mesos模式。官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。用户可选择两种调度模式之一运行自己的应用程序:

1) 粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。

2) 细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。

问题二十五:Spark技术栈有哪些组件,每个组件都有什么功能,适合什么应用场景?

1)Spark core:是其它组件的基础,spark的内核,主要包含:有向循环图、RDD、Lingage、Cache、broadcast等,并封装了底层通讯框架,是Spark的基础。

2)SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,将流式计算分解成一系列短小的批处理作业。

3)Spark sql:Shark是SparkSQL的前身,Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析

4)BlinkDB :是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。

5)MLBase是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase。MLBase分为四部分:MLlib,MLI、ML Optimizer和MLRuntime。

6)GraphX是Spark中用于图和图并行计算

问题二十六:spark中worker 的主要工作是什么?

主要功能:管理当前节点内存,CPU的使用情况,接受master发送过来的资源指令,通过executorRunner启动程序分配任务,worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务,需要注意的是:

1.worker会不会汇报当前信息给master?worker心跳给master主要只有workid,不会以心跳的方式发送资源信息给master,这样master就知道worker是否存活,只有故障的时候才会发送资源信息;

2.worker不会运行代码,具体运行的是executor,可以运行具体application斜的业务逻辑代码,操作代码的节点,不会去运行代码。

问题二十七:简单说一下hadoop和spark的shuffle相同和差异?

1)从 high-level 的角度来看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。

2)从 low-level 的角度来看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。

3)从实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。

如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read 的处理逻辑?以及两个处理逻辑应该怎么高效实现?

Shuffle write由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。

问题二十八:Mapreduce和Spark的都是并行计算,那么他们有什么相同和区别

两者都是用mr模型来进行并行计算:

1) hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。

2) spark用户提交的任务成为application,一个application对应一个sparkcontext,app中存在多个job,每触发一次action操作就会产生一个job。这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算。

3) hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系。

spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错。

问题二十九:RDD机制?

rdd分布式弹性数据集,简单的理解成一种数据结构,是spark框架上的通用货币。

所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。

rdd执行过程中会形成dag图,然后形成lineage保证容错性等。 从物理的角度来看rdd存储的是block和node之间的映射。

问题三十:spark有哪些组件?

答:主要有如下组件:

1)master:管理集群和节点,不参与计算。

2)worker:计算节点,进程本身不参与计算,和master汇报。

3)Driver:运行程序的main方法,创建spark context对象。

4)spark context:控制整个application的生命周期,包括dagsheduler和task scheduler等组件。

5)client:用户提交程序的入口。

问题三十一:spark工作机制?

答:用户在client端提交作业后,会由Driver运行main方法并创建spark context上下文。

执行add算子,形成dag图输入dagscheduler,按照add之间的依赖关系划分stage输入task scheduler。 task scheduler会将stage划分为task set分发到各个节点的executor中执行。

问题三十二:spark的优化怎么做?

答: spark调优比较复杂,但是大体可以分为三个方面来进行,

1)平台层面的调优:防止不必要的jar包分发,提高数据的本地性,选择高效的存储格式如parquet,

2)应用程序层面的调优:过滤操作符的优化降低过多小任务,降低单条记录的资源开销,处理数据倾斜,复用RDD进行缓存,作业并行化执行等等,

3)JVM层面的调优:设置合适的资源量,设置合理的JVM,启用高效的序列化方法如kyro,增大off head内存等等

​ 序列化在分布式系统中扮演着重要的角色,优化Spark程序时,首当其冲的就是对序列化方式的优化。Spark为使用者提供两种序列化方式:

  • Java serialization: 默认的序列化方式。
  • Kryo serialization: 相较于 Java serialization 的方式,速度更快,空间占用更小,但并不支持所有的序列化格式,同时使用的时候需要注册class。spark-sql中默认使用的是kyro的序列化方式。
    可以在spark-default.conf设置全局参数,也可以代码中初始化时对SparkConf设置 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) ,该参数会同时作用于机器之间数据的shuffle操作以及序列化rdd到磁盘,内存。
    Spark不将Kyro设置成默认的序列化方式是因为它需要对类进行注册,官方强烈建议在一些网络数据传输很大的应用中使用kyro序列化。

如果你要序列化的对象比较大,可以增加参数spark.kryoserializer.buffer所设置的值。

如果你没有注册需要序列化的class,Kyro依然可以照常工作,但会存储每个对象的全类名(full class name),这样的使用方式往往比默认的 Java serialization 还要浪费更多的空间。

可以设置 spark.kryo.registrationRequired 参数为 true,使用kyro时如果在应用中有类没有进行注册则会报错:

如上这个错误需要添加


文章作者: Leon
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Leon !
评论
 上一篇
Spark面试问题梳理:选择题 Spark面试问题梳理:选择题
1. Spark 的四大组件下面哪个不是 (D )A.Spark Streaming B. Mlib C Graphx D.Spark R 2. 下面哪个端口不是 spark 自带服务的端口 (C )A.8080 B.4040
2020-06-21
下一篇 
HiveSQL优化 hive参数版总结 HiveSQL优化 hive参数版总结
Hive SQL基本上适用大数据领域离线数据处理的大部分场景。Hive SQL的优化也是我们必须掌握的技能,而且,面试一定会问。那么,我希望面试者能答出其中的80%优化点,在这个问题上才算过关。 1. Hive优化目标 在有限的资源下,
2020-06-11
  目录