Spark面试问题梳理:选择题


1. Spark 的四大组件下面哪个不是 (D )

A.Spark Streaming B. Mlib

C Graphx D.Spark R

2. 下面哪个端口不是 spark 自带服务的端口 (C )

A.8080 B.4040 C.8090 D.18080

备注:8080:spark集群web ui端口,4040:sparkjob监控端口,18080:jobhistory端口

3. spark 1.4 版本的最大变化 (B )

A spark sql Release 版本 B .引入 Spark R

C DataFrame D.支持动态资源分配

4. Spark Job 默认的调度模式 (A )

A FIFO B FAIR

C 无 D 运行时指定

备注:Spark中的调度模式主要有两种:FIFO和FAIR。默认情况下Spark的调度模式是FIFO(先进先出),谁先提交谁先执行,后面的任务需要等待前面的任务执行。而FAIR(公平调度)模式支持在调度池中为任务进行分组,不同的调度池权重不同,任务可以按照权重来决定执行顺序。使用哪种调度器由参数spark.scheduler.mode来设置,可选的参数有FAIR和FIFO,默认是FIFO。

5.哪个不是本地模式运行的条件 ( D)

A spark.localExecution.enabled=true

B 显式指定本地运行

C finalStage 无父 Stage

D partition默认值

备注:【问题】Spark在windows能跑集群模式吗?

我认为是可以的,但是需要详细了解cmd命令行的写法。目前win下跑spark的单机模式是没有问题的。

【关键点】spark启动机制容易被windows的命令行cmd坑

  1、带空格、奇怪字符的安装路径,cmd不能识别。最典型的坑就是安装在Program Files文件夹下的程序,因为Program和Files之间有个空格,所以cmd竟不能识别。之前就把JDK安装在了Program Files下面,然后启动spark的时候,总是提示我找不到JDK。我明明配置了环境变量了啊?这就是所谓了《已经配置环境变量,spark 仍然找不到Java》的错误问题。至于奇怪的字符,如感叹号!,我经常喜欢用来将重要的文件夹排在最前面,但cmd命令提示符不能识别。

  2、是否需要配置hadoop的路径的问题——答案是需要用HDFS或者yarn就配,不需要用则不需配置。目前大多数的应用场景里面,Spark大规模集群基本安装在Linux服务器上,而自己用windows跑spark的情景,则大多基于学习或者实验性质,如果我们所要读取的数据文件从本地windows系统的硬盘读取(比如说d:\data\ml.txt),基本上不需要配置hadoop路径。我们都知道,在编spark程序的时候,可以指定spark的启动模式,而启动模式有这么三中(以python代码举例):

   (2.1)本地情况,conf = SparkConf().setMaster(“local[*]”) ——>也就是拿本机的spark来跑程序

   (2.2)远程情况,conf = SparkConf().setMaster(“spark://remotehost:7077”) ——>远程spark主机

   (2.3)yarn情况,conf = SparkConf().setMaster(“yarn-client”) ——>远程或本地 yarn集群代理spark

针对这3种情况,配置hadoop安装路径都有什么作用呢?(2.1)本地的情况,直接拿本机安装的spark来运行spark程序(比如d:\spark-1.6.2),则配不配制hadoop路径取决于是否需要使用hdfs。java程序的情况就更为简单,只需要导入相应的hadoop的jar包即可,是否配置hadoop路径并不重要。(2.2)的情况大体跟(2.1)的情况相同,虽然使用的远程spark,但如果使用本地数据,则运算的元数据也是从本地上传到远程spark集群的,无需配置hdfs。而(2.3)的情况就大不相同,经过我搜遍baidu、google、bing引擎,均没找到SparkConf直接配置远程yarn地址的方法,唯一的一个帖子介绍可以使用yarn://remote:8032的形式,则会报错“无法解析 地址”。查看Spark的官方说明,Spark其实是通过hadoop路径下的etc\hadoop文件夹中的配置文件来寻找yarn集群的。因此,需要使用yarn来运行spark的情况,在spark那配置好hadoop的目录就尤为重要。后期经过虚拟机的验证,表明,只要windows本地配置的host地址等信息与linux服务器端相同(注意应更改hadoop-2/etc/hadoop 下各种文件夹的配置路径,使其与windows本地一致),是可以直接在win下用yarn-client提交spark任务到远程集群的。

3、是否需要配置环境变量的问题,若初次配置,可以考虑在IDE里面配置,或者在程序本身用setProperty函数进行配置。因为配置windows下的hadoop、spark环境是个非常头疼的问题,有可能路径不对而导致无法找到相应要调用的程序。待实验多次成功率提高以后,再直接配置windows的全局环境变量不迟。

  4、使用Netbeans这个IDE的时候,有遇到Netbeans不能清理构建的问题。原因,极有可能是导入了重复的库,spark里面含有hadoop包,记得检查冲突。同时,在清理构建之前,记得重新编译一遍程序,再进行清理并构建。

  5、经常遇到WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources资源不足无法运行的问题,添加conf.set(“spark.executor.memory”, “512m”);语句进行资源限制。先前在虚拟机跑spark,由于本身机子性能不高,给虚拟机设置的内存仅仅2G,导致hadoop和spark双开之后系统资源严重不足。因此可以缩小每个executor的运算规模。其他资源缺乏问题的解决方法参考http://blog.sina.com.cn/s/blog_4b1452dd0102wyzo.html

6.下面哪个不是 RDD 的特点 (C )

A. 可分区 B 可序列化 C 可修改 D 可持久化

7. 关于广播变量,下面哪个是错误的 (D )

A 任何函数调用 B 是只读的

C 存储在各个节点 D 存储在磁盘或 HDFS

8. 关于累加器,下面哪个是错误的 (D )

A 支持加法 B 支持数值类型

C 可并行 D 不支持自定义类型

9.Spark 支持的分布式部署方式中哪个是错误的 (D )

A standalone B spark on mesos

C spark on YARN D Spark on local

10.Stage 的 Task 的数量由什么决定 (A )

A Partition B Job C Stage D TaskScheduler

11.下面哪个操作是窄依赖 (B )

A join B filter

C group D sort

12.下面哪个操作肯定是宽依赖 (C )

A map B flatMap

C reduceByKey D sample

13.spark 的 master 和 worker 通过什么方式进行通信的? (D )

A http B nio C netty D Akka

备注:从spark1.3.1之后,netty完全代替 了akka

一直以来,基于Akka实现的RPC通信框架是Spark引以为豪的主要特性,也是与Hadoop等分布式计算框架对比过程中一大亮点,但是时代和技术都在演化,从Spark1.3.1版本开始,为了解决大数据块(如shuffle)的传输问题,Spark引入了Netty通信框架,到了1.6.0版本,Netty居然完全取代了Akka,承担Spark内部所有的RPC通信以及数据流传输。

那么Akka又是什么东西?从Akka出现背景来说,它是基于Actor的RPC通信系统,它的核心概念也是Message,它是基于协程的,性能不容置疑;基于scala的偏函数,易用性也没有话说,但是它毕竟只是RPC通信,无法适用大的package/stream的数据传输,这也是Spark早期引入Netty的原因。

那么Netty为什么可以取代Akka?首先不容置疑的是Akka可以做到的,Netty也可以做到,但是Netty可以做到,Akka却无法做到,原因是啥?在软件栈中,Akka相比Netty要Higher一点,它专门针对RPC做了很多事情,而Netty相比更加基础一点,可以为不同的应用层通信协议(RPC,FTP,HTTP等)提供支持,在早期的Akka版本,底层的NIO通信就是用的Netty;其次一个优雅的工程师是不会允许一个系统中容纳两套通信框架,恶心!最后,虽然Netty没有Akka协程级的性能优势,但是Netty内部高效的Reactor线程模型,无锁化的串行设计,高效的序列化,零拷贝,内存池等特性也保证了Netty不会存在性能问题。

那么Spark是怎么用Netty来取代Akka呢?一句话,利用偏函数的特性,基于Netty“仿造”出一个简约版本的Actor模型!!

14. 默认的存储级别 (A )

A MEMORY_ONLY B MEMORY_ONLY_SER

C MEMORY_AND_DISK D MEMORY_AND_DISK_SER

备注:

//不会保存任务数据 
val NONE = new StorageLevel(false, false, false, false) //直接将RDD的partition保存在该节点的Disk上 
val DISK_ONLY = new StorageLevel(true, false, false, false) //直接将RDD的partition保存在该节点的Disk上,在其他节点上保存一个相同的备份 
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) //将RDD的partition对应的原生的Java Object保存在JVM中,如果RDD太大导致它的部分partition不能存储在内存中 //那么这些partition将不会缓存,并且需要的时候被重新计算,默认缓存的级别 
val MEMORY_ONLY = new StorageLevel(false, true, false, true) //将RDD的partition对应的原生的Java Object保存在JVM中,在其他节点上保存一个相同的备份 
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) 
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) //将RDD的partition反序列化后的对象存储在JVM中,如果RDD太大导致它的部分partition不能存储在内存中 //超出的partition将被保存在Disk上,并且在需要时读取 
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) //在其他节点上保存一个相同的备份 
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) //将RDD的partition序列化后存储在Tachyon中 
val OFF_HEAP = new StorageLevel(false, false, true, false)

15 spark.deploy.recoveryMode 不支持那种 (D )

A.ZooKeeper B. FileSystem

D NONE D Hadoop

16.下列哪个不是 RDD 的缓存方法 (C )

A persist() B Cache()

C Memory()

17.Task 运行在下来哪里个选项中 Executor 上的工作单元 (C )

A Driver program B. spark master

C.worker node D Cluster manager

18.hive 的元数据存储在 derby 和 MySQL 中有什么区别 (B )

A.没区别 B.多会话

C.支持网络环境 D数据库的区别

备注:  Hive 将元数据存储在 RDBMS 中,一般常用 MySQL 和 Derby。默认情况下,Hive 元数据保存在内嵌的 Derby 数据库中,只能允许一个会话连接,只适合简单的测试。实际生产环境中不适用, 为了支持多用户会话,则需要一个独立的元数据库,使用 MySQL 作为元数据库,Hive 内部对 MySQL 提供了很好的支持。

内置的derby主要问题是并发性能很差,可以理解为单线程操作。

Derby还有一个特性。更换目录执行操作,会找不到相关表等

19.DataFrame 和 RDD 最大的区别 (B )

A.科学统计支持 B.多了 schema

C.存储方式不一样 D.外部数据源支持

备注:

上图直观体现了RDD与DataFrame的区别:左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。

提升执行效率: RDD API是函数式的,强调不变性,在大部分场景下倾向于创建新对象而不是修改老对象。这一特点虽然带来了干净整洁的API,却也使得Spark应用程序在运行期倾向于创建大量临时对象,对GC造成压力。在现有RDD API的基础之上,我们固然可以利用mapPartitions方法来重载RDD单个分片内的数据创建方式,用复用可变对象的方式来减小对象分配和GC的开销,但这牺牲了代码的可读性,而且要求开发者对Spark运行时机制有一定的了解,门槛较高。另一方面,Spark SQL在框架内部已经在各种可能的情况下尽量重用对象,这样做虽然在内部会打破了不变性,但在将数据返回给用户时,还会重新转为不可变数据。利用 DataFrame API进行开发,可以免费地享受到这些优化效果。

减少数据读取:分析大数据,最快的方法就是 ——忽略它。这里的“忽略”并不是熟视无睹,而是根据查询条件进行恰当的剪枝。

上文讨论分区表时提到的分区剪 枝便是其中一种——当查询的过滤条件中涉及到分区列时,我们可以根据查询条件剪掉肯定不包含目标数据的分区目录,从而减少IO。

对于一些“智能”数据格 式,Spark SQL还可以根据数据文件中附带的统计信息来进行剪枝。简单来说,在这类数据格式中,数据是分段保存的,每段数据都带有最大值、最小值、null值数量等 一些基本的统计信息。当统计信息表名某一数据段肯定不包括符合查询条件的目标数据时,该数据段就可以直接跳过(例如某整数列a某段的最大值为100,而查询条件要求a > 200)。

此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存储格式的优势,仅扫描查询真正涉及的列,忽略其余列的数据。

为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。

得到的优化执行计划在转换成物 理执行计划的过程中,还可以根据具体的数据源的特性将过滤条件下推至数据源内。最右侧的物理执行计划中Filter之所以消失不见,就是因为溶入了用于执行最终的读取操作的表扫描节点内。

对于普通开发者而言,查询优化 器的意义在于,即便是经验并不丰富的程序员写出的次优的查询,也可以被尽量转换为高效的形式予以执行。

  • RDD和Dataset

    ​ DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。

    ​ DataSet创立需要一个显式的Encoder,把对象序列化为二进制,可以把对象的scheme映射为Spark

    SQl类型,然而RDD依赖于运行时反射机制。

  • DataFrame和Dataset

    ​ Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。因此具有如下三个特点:

    ​ DataSet可以在编译时检查类型

    并且是面向对象的编程接口。

20.Master 的 ElectedLeader 事件后做了哪些操作 (D )

A. 通知 driver B.通知 worker

C.注册 application D.直接 ALIVE

34.cache后面能不能接其他算子,它是不是action操作?

答:cache可以接其他算子,但是接了算子之后,起不到缓存应有的效果,因为会重新触发cache。

cache不是action操作

35.reduceByKey是不是action?

答:不是,很多人都会以为是action,reduce rdd是action

36.数据本地性是在哪个环节确定的?

具体的task运行在那他机器上,dag划分stage的时候确定的

37.RDD的弹性表现在哪几点?

1)自动的进行内存和磁盘的存储切换;

2)基于Lingage的高效容错;

3)task如果失败会自动进行特定次数的重试;

4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;

5)checkpoint和persist,数据计算之后持久化缓存

6)数据调度弹性,DAG TASK调度和资源无关

7)数据分片的高度弹性,a.分片很多碎片可以合并成大的,b.par

38.常规的容错方式有哪几种类型?

1).数据检查点,会发生拷贝,浪费资源

2).记录数据的更新,每次更新都会记录下来,比较复杂且比较消耗性能

39.RDD通过Linage(记录数据更新)的方式为何很高效?

1)lazy记录了数据的来源,RDD是不可变的,且是lazy级别的,且rDD

之间构成了链条,lazy是弹性的基石。由于RDD不可变,所以每次操作就

产生新的rdd,不存在全局修改的问题,控制难度下降,所有有计算链条

将复杂计算链条存储下来,计算的时候从后往前回溯

900步是上一个stage的结束,要么就checkpoint

2)记录原数据,是每次修改都记录,代价很大

如果修改一个集合,代价就很小,官方说rdd是

粗粒度的操作,是为了效率,为了简化,每次都是

操作数据集合,写或者修改操作,都是基于集合的

rdd的写操作是粗粒度的,rdd的读操作既可以是粗粒度的

也可以是细粒度,读可以读其中的一条条的记录。

3)简化复杂度,是高效率的一方面,写的粗粒度限制了使用场景

如网络爬虫,现实世界中,大多数写是粗粒度的场景

40.RDD有哪些缺陷?

1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的

所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是

说可以一条条的读

2)不支持增量迭代计算,Flink支持

41.说一说Spark程序编写的一般步骤?

答:初始化,资源,数据源,并行化,rdd转化,action算子打印输出结果或者也可以存至相应的数据存储介质,具体的可看下图:

file:///E:/%E5%AE%89%E8%A3%85%E8%BD%AF%E4%BB%B6/%E6%9C%89%E9%81%93%E7%AC%94%E8%AE%B0%E6%96%87%E4%BB%B6/qq19B99AF2399E52F466CC3CF7E3B24ED5/069fa7b471f54e038440faf63233acce/640.webp

42. Spark有哪两种算子?

答:Transformation(转化)算子和Action(执行)算子。

43. Spark提交你的jar包时所用的命令是什么?

答:spark-submit。

44. Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?

答:在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

45. 你所理解的Spark的shuffle过程?

答:从下面三点去展开

1)shuffle过程的划分

2)shuffle的中间结果如何存储

3)shuffle的数据如何拉取过来

可以参考这篇博文:http://www.cnblogs.com/jxhd1/p/6528540.html

Shuffle后续优化方向:通过上面的介绍,我们了解到,Shuffle过程的主要存储介质是磁盘,尽量的减少IO是Shuffle的主要优化方向。我们脑海中都有那个经典的存储金字塔体系,Shuffle过程为什么把结果都放在磁盘上,那是因为现在内存再大也大不过磁盘,内存就那么大,还这么多张嘴吃,当然是分配给最需要的了。如果具有“土豪”内存节点,减少Shuffle IO的最有效方式无疑是尽量把数据放在内存中。下面列举一些现在看可以优化的方面,期待经过我们不断的努力,TDW计算引擎运行地更好。

MapReduce Shuffle后续优化方向:压缩:对数据进行压缩,减少写读数据量;

减少不必要的排序:并不是所有类型的Reduce需要的数据都是需要排序的,排序这个nb的过程如果不需要最好还是不要的好;
内存化:Shuffle的数据不放在磁盘而是尽量放在内存中,除非逼不得已往磁盘上放;当然了如果有性能和内存相当的第三方存储系统,那放在第三方存储系统上也是很好的;这个是个大招;
网络框架:netty的性能据说要占优了;
本节点上的数据不走网络框架:对于本节点上的Map输出,Reduce直接去读吧,不需要绕道网络框架。
Spark Shuffle后续优化方向:Spark作为MapReduce的进阶架构,对于Shuffle过程已经是优化了的,特别是对于那些具有争议的步骤已经做了优化,但是Spark的Shuffle对于我们来说在一些方面还是需要优化的。

压缩:对数据进行压缩,减少写读数据量;
内存化:Spark历史版本中是有这样设计的:Map写数据先把数据全部写到内存中,写完之后再把数据刷到磁盘上;考虑内存是紧缺资源,后来修改成把数据直接写到磁盘了;对于具有较大内存的集群来讲,还是尽量地往内存上写吧,内存放不下了再放磁盘。

46. 你如何从Kafka中获取数据?

1) 基于Receiver的方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。

2) 基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据

47. 对于Spark中的数据倾斜问题你有什么好的方案?

1)前提是定位数据倾斜,是OOM了,还是任务执行缓慢,看日志,看WebUI

2)解决方法,有多个方面

· 避免不必要的shuffle,如使用广播小表的方式,将reduce-side-join提升为map-side-join

·分拆发生数据倾斜的记录,分成几个部分进行,然后合并join后的结果

·改变并行度,可能并行度太少了,导致个别task数据压力大

·两阶段聚合,先局部聚合,再全局聚合

·自定义paritioner,分散key的分布,使其更加均匀

详细解决方案参考博文《Spark数据倾斜优化方法》

48.RDD创建有哪几种方式?

1).使用程序中的集合创建rdd

2).使用本地文件系统创建rdd

3).使用hdfs创建rdd,

4).基于数据库db创建rdd

5).基于Nosql创建rdd,如hbase

6).基于s3创建rdd,

7).基于数据流,如socket创建rdd

如果只回答了前面三种,是不够的,只能说明你的水平还是入门级的,实践过程中有很多种创建方式。

49.Spark并行度怎么设置比较合适

答:spark并行度,每个core承载24个partition,如,32个core,那么64128之间的并行度,也就是

设置64~128个partion,并行读和数据规模无关,只和内存使用量和cpu使用

时间有关

50.Spark中数据的位置是被谁管理的?

答:每个数据分片都对应具体物理位置,数据的位置是被blockManager,无论

数据是在磁盘,内存还是tacyan,都是由blockManager管理

答:Spark中的数据本地性有三种:

a.PROCESS_LOCAL是指读取缓存在本地节点的数据

b.NODE_LOCAL是指读取本地节点硬盘数据

c.ANY是指读取非本地节点数据

通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关,如果RDD经常用的话将该RDD cache到内存中,注意,由于cache是lazy的,所以必须通过一个action的触发,才能真正的将该RDD cache到内存中。

52.rdd有几种操作类型?

1)transformation,rdd由一种转为另一种rdd

2)action,

3)cronroller,crontroller是控制算子,cache,persist,对性能和效率的有很好的支持

三种类型,不要回答只有2中操作

53.Spark如何处理不能被序列化的对象?

将不能序列化的内容封装成object

54.collect功能是什么,其底层是怎么实现的?

答:driver通过collect把集群中各个节点的内容收集过来汇总成结果,collect返回结果是Array类型的,collect把各个节点上的数据抓过来,抓过来数据是Array型,collect对Array抓过来的结果进行合并,合并后Array中只有一个元素,是tuple类型(KV类型的)的。

55.Spaek程序执行,有时候默认为什么会产生很多task,怎么修改默认task执行个数?

答:

1)因为输入数据有很多task,尤其是有很多小文件的时候,有多少个输入block就会有多少个task启动;

2)spark中有partition的概念,每个partition都会对应一个task,task越多,在处理大规模数据的时候,就会越有效率。不过task并不是越多越好,如果平时测试,或者数据量没有那么大,则没有必要task数量太多。

3)参数可以通过spark_home/conf/spark-default.conf配置文件设置:

spark.sql.shuffle.partitions 50 spark.default.parallelism 10

第一个是针对spark sql的task数量

第二个是非spark sql程序设置生效

56.为什么Spark Application在没有获得足够的资源,job就开始执行了,可能会导致什么什么问题发生?

答:会导致执行该job时候集群资源不足,导致执行job结束也没有分配足够的资源,分配了部分Executor,该job就开始执行task,应该是task的调度线程和Executor资源申请是异步的;如果想等待申请完所有的资源再执行job的:需要将spark.scheduler.maxRegisteredResourcesWaitingTime设置的很大;spark.scheduler.minRegisteredResourcesRatio 设置为1,但是应该结合实际考虑

否则很容易出现长时间分配不到资源,job一直不能运行的情况。

57.map与flatMap的区别

map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象

flatMap:对RDD每个元素转换,然后再扁平化

将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组

对象,会抛弃值为null的值

58.列举你常用的action?

collect,reduce,take,count,saveAsTextFile等

59.Spark为什么要持久化,一般什么场景下要进行persist操作?

为什么要进行持久化?

spark所有复杂一点的算法都会有persist身影,spark默认数据放在内存,spark很多内容都是放在内存的,非常适合高速迭代,1000个步骤

只有第一个输入数据,中间不产生临时数据,但分布式系统风险很高,所以容易出错,就要容错,rdd出错或者分片可以根据血统算出来,如果没有对父rdd进行persist 或者cache的化,就需要重头做。

以下场景会使用persist

1)某个步骤计算非常耗时,需要进行persist持久化

2)计算链条非常长,重新恢复要算很多步骤,很好使,persist

3)checkpoint所在的rdd要持久化persist,

lazy级别,框架发现有checnkpoint,checkpoint时单独触发一个job,需要重算一遍,checkpoint前

要持久化,写个rdd.cache或者rdd.persist,将结果保存起来,再写checkpoint操作,这样执行起来会非常快,不需要重新计算rdd链条了。checkpoint之前一定会进行persist。

4)shuffle之后为什么要persist,shuffle要进性网络传输,风险很大,数据丢失重来,恢复代价很大

5)shuffle之前进行persist,框架默认将数据持久化到磁盘,这个是框架自动做的。

60.为什么要进行序列化

序列化可以减少数据的体积,减少存储空间,高效存储和传输数据,不好的是使用的时候要反序列化,非常消耗CPU

61.介绍一下join操作优化经验?

答:join其实常见的就分为两类: map-side join 和 reduce-side join。当大表和小表join时,用map-side join能显著提高效率。将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。

备注:这个题目面试中非常非常大概率见到,务必搜索相关资料掌握,这里抛砖引玉。

62.介绍一下cogroup rdd实现原理,你在什么场景下用过这个rdd?

答:cogroup的函数实现:这个实现根据两个要进行合并的两个RDD操作,生成一个CoGroupedRDD的实例,这个RDD的返回结果是把相同的key中两个RDD分别进行合并操作,最后返回的RDD的value是一个Pair的实例,这个实例包含两个Iterable的值,第一个值表示的是RDD1中相同KEY的值,第二个值表示的是RDD2中相同key的值.由于做cogroup的操作,需要通过partitioner进行重新分区的操作,因此,执行这个流程时,需要执行一次shuffle的操作(如果要进行合并的两个RDD的都已经是shuffle后的rdd,同时他们对应的partitioner相同时,就不需要执行shuffle,),

场景:表关联查询

63下面这段代码输出结果是什么?


def joinRdd(sc:SparkContext) {

val name= Array(

Tuple2(1,"spark"),

Tuple2(2,"tachyon"),

Tuple2(3,"hadoop")

)

val score= Array(

Tuple2(1,100),

Tuple2(2,90),

Tuple2(3,80)

)

val namerdd=sc.parallelize(name);

val scorerdd=sc.parallelize(score);

val result = namerdd.join(scorerdd);

result .collect.foreach(println);

}

--------------------------

答案:

(1,(Spark,100))

(2,(tachyon,90))

(3,(hadoop,80))

文章作者: Leon
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Leon !
评论
  目录