1. Spark 的四大组件下面哪个不是 (D )
A.Spark Streaming B. Mlib
C Graphx D.Spark R
2. 下面哪个端口不是 spark 自带服务的端口 (C )
A.8080 B.4040 C.8090 D.18080
备注:8080:spark集群web ui端口,4040:sparkjob监控端口,18080:jobhistory端口
3. spark 1.4 版本的最大变化 (B )
A spark sql Release 版本 B .引入 Spark R
C DataFrame D.支持动态资源分配
4. Spark Job 默认的调度模式 (A )
A FIFO B FAIR
C 无 D 运行时指定
备注:Spark中的调度模式主要有两种:FIFO和FAIR。默认情况下Spark的调度模式是FIFO(先进先出),谁先提交谁先执行,后面的任务需要等待前面的任务执行。而FAIR(公平调度)模式支持在调度池中为任务进行分组,不同的调度池权重不同,任务可以按照权重来决定执行顺序。使用哪种调度器由参数spark.scheduler.mode来设置,可选的参数有FAIR和FIFO,默认是FIFO。
5.哪个不是本地模式运行的条件 ( D)
A spark.localExecution.enabled=true
B 显式指定本地运行
C finalStage 无父 Stage
D partition默认值
备注:【问题】Spark在windows能跑集群模式吗?
我认为是可以的,但是需要详细了解cmd命令行的写法。目前win下跑spark的单机模式是没有问题的。
【关键点】spark启动机制容易被windows的命令行cmd坑
1、带空格、奇怪字符的安装路径,cmd不能识别。最典型的坑就是安装在Program Files文件夹下的程序,因为Program和Files之间有个空格,所以cmd竟不能识别。之前就把JDK安装在了Program Files下面,然后启动spark的时候,总是提示我找不到JDK。我明明配置了环境变量了啊?这就是所谓了《已经配置环境变量,spark 仍然找不到Java》的错误问题。至于奇怪的字符,如感叹号!,我经常喜欢用来将重要的文件夹排在最前面,但cmd命令提示符不能识别。
2、是否需要配置hadoop的路径的问题——答案是需要用HDFS或者yarn就配,不需要用则不需配置。目前大多数的应用场景里面,Spark大规模集群基本安装在Linux服务器上,而自己用windows跑spark的情景,则大多基于学习或者实验性质,如果我们所要读取的数据文件从本地windows系统的硬盘读取(比如说d:\data\ml.txt),基本上不需要配置hadoop路径。我们都知道,在编spark程序的时候,可以指定spark的启动模式,而启动模式有这么三中(以python代码举例):
(2.1)本地情况,conf = SparkConf().setMaster(“local[*]”) ——>也就是拿本机的spark来跑程序
(2.2)远程情况,conf = SparkConf().setMaster(“spark://remotehost:7077”) ——>远程spark主机
(2.3)yarn情况,conf = SparkConf().setMaster(“yarn-client”) ——>远程或本地 yarn集群代理spark
针对这3种情况,配置hadoop安装路径都有什么作用呢?(2.1)本地的情况,直接拿本机安装的spark来运行spark程序(比如d:\spark-1.6.2),则配不配制hadoop路径取决于是否需要使用hdfs。java程序的情况就更为简单,只需要导入相应的hadoop的jar包即可,是否配置hadoop路径并不重要。(2.2)的情况大体跟(2.1)的情况相同,虽然使用的远程spark,但如果使用本地数据,则运算的元数据也是从本地上传到远程spark集群的,无需配置hdfs。而(2.3)的情况就大不相同,经过我搜遍baidu、google、bing引擎,均没找到SparkConf直接配置远程yarn地址的方法,唯一的一个帖子介绍可以使用yarn://remote:8032的形式,则会报错“无法解析 地址”。查看Spark的官方说明,Spark其实是通过hadoop路径下的etc\hadoop文件夹中的配置文件来寻找yarn集群的。因此,需要使用yarn来运行spark的情况,在spark那配置好hadoop的目录就尤为重要。后期经过虚拟机的验证,表明,只要windows本地配置的host地址等信息与linux服务器端相同(注意应更改hadoop-2/etc/hadoop 下各种文件夹的配置路径,使其与windows本地一致),是可以直接在win下用yarn-client提交spark任务到远程集群的。
3、是否需要配置环境变量的问题,若初次配置,可以考虑在IDE里面配置,或者在程序本身用setProperty函数进行配置。因为配置windows下的hadoop、spark环境是个非常头疼的问题,有可能路径不对而导致无法找到相应要调用的程序。待实验多次成功率提高以后,再直接配置windows的全局环境变量不迟。
4、使用Netbeans这个IDE的时候,有遇到Netbeans不能清理构建的问题。原因,极有可能是导入了重复的库,spark里面含有hadoop包,记得检查冲突。同时,在清理构建之前,记得重新编译一遍程序,再进行清理并构建。
5、经常遇到WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources资源不足无法运行的问题,添加conf.set(“spark.executor.memory”, “512m”);语句进行资源限制。先前在虚拟机跑spark,由于本身机子性能不高,给虚拟机设置的内存仅仅2G,导致hadoop和spark双开之后系统资源严重不足。因此可以缩小每个executor的运算规模。其他资源缺乏问题的解决方法参考http://blog.sina.com.cn/s/blog_4b1452dd0102wyzo.html
6.下面哪个不是 RDD 的特点 (C )
A. 可分区 B 可序列化 C 可修改 D 可持久化
7. 关于广播变量,下面哪个是错误的 (D )
A 任何函数调用 B 是只读的
C 存储在各个节点 D 存储在磁盘或 HDFS
8. 关于累加器,下面哪个是错误的 (D )
A 支持加法 B 支持数值类型
C 可并行 D 不支持自定义类型
9.Spark 支持的分布式部署方式中哪个是错误的 (D )
A standalone B spark on mesos
C spark on YARN D Spark on local
10.Stage 的 Task 的数量由什么决定 (A )
A Partition B Job C Stage D TaskScheduler
11.下面哪个操作是窄依赖 (B )
A join B filter
C group D sort
12.下面哪个操作肯定是宽依赖 (C )
A map B flatMap
C reduceByKey D sample
13.spark 的 master 和 worker 通过什么方式进行通信的? (D )
A http B nio C netty D Akka
备注:从spark1.3.1之后,netty完全代替 了akka
一直以来,基于Akka实现的RPC通信框架是Spark引以为豪的主要特性,也是与Hadoop等分布式计算框架对比过程中一大亮点,但是时代和技术都在演化,从Spark1.3.1版本开始,为了解决大数据块(如shuffle)的传输问题,Spark引入了Netty通信框架,到了1.6.0版本,Netty居然完全取代了Akka,承担Spark内部所有的RPC通信以及数据流传输。
那么Akka又是什么东西?从Akka出现背景来说,它是基于Actor的RPC通信系统,它的核心概念也是Message,它是基于协程的,性能不容置疑;基于scala的偏函数,易用性也没有话说,但是它毕竟只是RPC通信,无法适用大的package/stream的数据传输,这也是Spark早期引入Netty的原因。
那么Netty为什么可以取代Akka?首先不容置疑的是Akka可以做到的,Netty也可以做到,但是Netty可以做到,Akka却无法做到,原因是啥?在软件栈中,Akka相比Netty要Higher一点,它专门针对RPC做了很多事情,而Netty相比更加基础一点,可以为不同的应用层通信协议(RPC,FTP,HTTP等)提供支持,在早期的Akka版本,底层的NIO通信就是用的Netty;其次一个优雅的工程师是不会允许一个系统中容纳两套通信框架,恶心!最后,虽然Netty没有Akka协程级的性能优势,但是Netty内部高效的Reactor线程模型,无锁化的串行设计,高效的序列化,零拷贝,内存池等特性也保证了Netty不会存在性能问题。
那么Spark是怎么用Netty来取代Akka呢?一句话,利用偏函数的特性,基于Netty“仿造”出一个简约版本的Actor模型!!
14. 默认的存储级别 (A )
A MEMORY_ONLY B MEMORY_ONLY_SER
C MEMORY_AND_DISK D MEMORY_AND_DISK_SER
备注:
//不会保存任务数据
val NONE = new StorageLevel(false, false, false, false) //直接将RDD的partition保存在该节点的Disk上
val DISK_ONLY = new StorageLevel(true, false, false, false) //直接将RDD的partition保存在该节点的Disk上,在其他节点上保存一个相同的备份
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) //将RDD的partition对应的原生的Java Object保存在JVM中,如果RDD太大导致它的部分partition不能存储在内存中 //那么这些partition将不会缓存,并且需要的时候被重新计算,默认缓存的级别
val MEMORY_ONLY = new StorageLevel(false, true, false, true) //将RDD的partition对应的原生的Java Object保存在JVM中,在其他节点上保存一个相同的备份
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) //将RDD的partition反序列化后的对象存储在JVM中,如果RDD太大导致它的部分partition不能存储在内存中 //超出的partition将被保存在Disk上,并且在需要时读取
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) //在其他节点上保存一个相同的备份
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) //将RDD的partition序列化后存储在Tachyon中
val OFF_HEAP = new StorageLevel(false, false, true, false)
15 spark.deploy.recoveryMode 不支持那种 (D )
A.ZooKeeper B. FileSystem
D NONE D Hadoop
16.下列哪个不是 RDD 的缓存方法 (C )
A persist() B Cache()
C Memory()
17.Task 运行在下来哪里个选项中 Executor 上的工作单元 (C )
A Driver program B. spark master
C.worker node D Cluster manager
18.hive 的元数据存储在 derby 和 MySQL 中有什么区别 (B )
A.没区别 B.多会话
C.支持网络环境 D数据库的区别
备注: Hive 将元数据存储在 RDBMS 中,一般常用 MySQL 和 Derby。默认情况下,Hive 元数据保存在内嵌的 Derby 数据库中,只能允许一个会话连接,只适合简单的测试。实际生产环境中不适用, 为了支持多用户会话,则需要一个独立的元数据库,使用 MySQL 作为元数据库,Hive 内部对 MySQL 提供了很好的支持。
内置的derby主要问题是并发性能很差,可以理解为单线程操作。
Derby还有一个特性。更换目录执行操作,会找不到相关表等
19.DataFrame 和 RDD 最大的区别 (B )
A.科学统计支持 B.多了 schema
C.存储方式不一样 D.外部数据源支持
备注:
上图直观体现了RDD与DataFrame的区别:左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。
提升执行效率: RDD API是函数式的,强调不变性,在大部分场景下倾向于创建新对象而不是修改老对象。这一特点虽然带来了干净整洁的API,却也使得Spark应用程序在运行期倾向于创建大量临时对象,对GC造成压力。在现有RDD API的基础之上,我们固然可以利用mapPartitions方法来重载RDD单个分片内的数据创建方式,用复用可变对象的方式来减小对象分配和GC的开销,但这牺牲了代码的可读性,而且要求开发者对Spark运行时机制有一定的了解,门槛较高。另一方面,Spark SQL在框架内部已经在各种可能的情况下尽量重用对象,这样做虽然在内部会打破了不变性,但在将数据返回给用户时,还会重新转为不可变数据。利用 DataFrame API进行开发,可以免费地享受到这些优化效果。
减少数据读取:分析大数据,最快的方法就是 ——忽略它。这里的“忽略”并不是熟视无睹,而是根据查询条件进行恰当的剪枝。
上文讨论分区表时提到的分区剪 枝便是其中一种——当查询的过滤条件中涉及到分区列时,我们可以根据查询条件剪掉肯定不包含目标数据的分区目录,从而减少IO。
对于一些“智能”数据格 式,Spark SQL还可以根据数据文件中附带的统计信息来进行剪枝。简单来说,在这类数据格式中,数据是分段保存的,每段数据都带有最大值、最小值、null值数量等 一些基本的统计信息。当统计信息表名某一数据段肯定不包括符合查询条件的目标数据时,该数据段就可以直接跳过(例如某整数列a某段的最大值为100,而查询条件要求a > 200)。
此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存储格式的优势,仅扫描查询真正涉及的列,忽略其余列的数据。
为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。
得到的优化执行计划在转换成物 理执行计划的过程中,还可以根据具体的数据源的特性将过滤条件下推至数据源内。最右侧的物理执行计划中Filter之所以消失不见,就是因为溶入了用于执行最终的读取操作的表扫描节点内。
对于普通开发者而言,查询优化 器的意义在于,即便是经验并不丰富的程序员写出的次优的查询,也可以被尽量转换为高效的形式予以执行。
RDD和Dataset
DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。
DataSet创立需要一个显式的Encoder,把对象序列化为二进制,可以把对象的scheme映射为Spark
SQl类型,然而RDD依赖于运行时反射机制。
DataFrame和Dataset
Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。因此具有如下三个特点:
DataSet可以在编译时检查类型
并且是面向对象的编程接口。
20.Master 的 ElectedLeader 事件后做了哪些操作 (D )
A. 通知 driver B.通知 worker
C.注册 application D.直接 ALIVE
34.cache后面能不能接其他算子,它是不是action操作?
答:cache可以接其他算子,但是接了算子之后,起不到缓存应有的效果,因为会重新触发cache。
cache不是action操作
35.reduceByKey是不是action?
答:不是,很多人都会以为是action,reduce rdd是action
36.数据本地性是在哪个环节确定的?
具体的task运行在那他机器上,dag划分stage的时候确定的
37.RDD的弹性表现在哪几点?
1)自动的进行内存和磁盘的存储切换;
2)基于Lingage的高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist,数据计算之后持久化缓存
6)数据调度弹性,DAG TASK调度和资源无关
7)数据分片的高度弹性,a.分片很多碎片可以合并成大的,b.par
38.常规的容错方式有哪几种类型?
1).数据检查点,会发生拷贝,浪费资源
2).记录数据的更新,每次更新都会记录下来,比较复杂且比较消耗性能
39.RDD通过Linage(记录数据更新)的方式为何很高效?
1)lazy记录了数据的来源,RDD是不可变的,且是lazy级别的,且rDD
之间构成了链条,lazy是弹性的基石。由于RDD不可变,所以每次操作就
产生新的rdd,不存在全局修改的问题,控制难度下降,所有有计算链条
将复杂计算链条存储下来,计算的时候从后往前回溯
900步是上一个stage的结束,要么就checkpoint
2)记录原数据,是每次修改都记录,代价很大
如果修改一个集合,代价就很小,官方说rdd是
粗粒度的操作,是为了效率,为了简化,每次都是
操作数据集合,写或者修改操作,都是基于集合的
rdd的写操作是粗粒度的,rdd的读操作既可以是粗粒度的
也可以是细粒度,读可以读其中的一条条的记录。
3)简化复杂度,是高效率的一方面,写的粗粒度限制了使用场景
如网络爬虫,现实世界中,大多数写是粗粒度的场景
40.RDD有哪些缺陷?
1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的
所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是
说可以一条条的读
2)不支持增量迭代计算,Flink支持
41.说一说Spark程序编写的一般步骤?
答:初始化,资源,数据源,并行化,rdd转化,action算子打印输出结果或者也可以存至相应的数据存储介质,具体的可看下图:
file:///E:/%E5%AE%89%E8%A3%85%E8%BD%AF%E4%BB%B6/%E6%9C%89%E9%81%93%E7%AC%94%E8%AE%B0%E6%96%87%E4%BB%B6/qq19B99AF2399E52F466CC3CF7E3B24ED5/069fa7b471f54e038440faf63233acce/640.webp
42. Spark有哪两种算子?
答:Transformation(转化)算子和Action(执行)算子。
43. Spark提交你的jar包时所用的命令是什么?
答:spark-submit。
44. Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?
答:在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。
45. 你所理解的Spark的shuffle过程?
答:从下面三点去展开
1)shuffle过程的划分
2)shuffle的中间结果如何存储
3)shuffle的数据如何拉取过来
可以参考这篇博文:http://www.cnblogs.com/jxhd1/p/6528540.html
Shuffle后续优化方向:通过上面的介绍,我们了解到,Shuffle过程的主要存储介质是磁盘,尽量的减少IO是Shuffle的主要优化方向。我们脑海中都有那个经典的存储金字塔体系,Shuffle过程为什么把结果都放在磁盘上,那是因为现在内存再大也大不过磁盘,内存就那么大,还这么多张嘴吃,当然是分配给最需要的了。如果具有“土豪”内存节点,减少Shuffle IO的最有效方式无疑是尽量把数据放在内存中。下面列举一些现在看可以优化的方面,期待经过我们不断的努力,TDW计算引擎运行地更好。
MapReduce Shuffle后续优化方向:压缩:对数据进行压缩,减少写读数据量;
减少不必要的排序:并不是所有类型的Reduce需要的数据都是需要排序的,排序这个nb的过程如果不需要最好还是不要的好;
内存化:Shuffle的数据不放在磁盘而是尽量放在内存中,除非逼不得已往磁盘上放;当然了如果有性能和内存相当的第三方存储系统,那放在第三方存储系统上也是很好的;这个是个大招;
网络框架:netty的性能据说要占优了;
本节点上的数据不走网络框架:对于本节点上的Map输出,Reduce直接去读吧,不需要绕道网络框架。
Spark Shuffle后续优化方向:Spark作为MapReduce的进阶架构,对于Shuffle过程已经是优化了的,特别是对于那些具有争议的步骤已经做了优化,但是Spark的Shuffle对于我们来说在一些方面还是需要优化的。压缩:对数据进行压缩,减少写读数据量;
内存化:Spark历史版本中是有这样设计的:Map写数据先把数据全部写到内存中,写完之后再把数据刷到磁盘上;考虑内存是紧缺资源,后来修改成把数据直接写到磁盘了;对于具有较大内存的集群来讲,还是尽量地往内存上写吧,内存放不下了再放磁盘。
46. 你如何从Kafka中获取数据?
1) 基于Receiver的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
2) 基于Direct的方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据
47. 对于Spark中的数据倾斜问题你有什么好的方案?
1)前提是定位数据倾斜,是OOM了,还是任务执行缓慢,看日志,看WebUI
2)解决方法,有多个方面
· 避免不必要的shuffle,如使用广播小表的方式,将reduce-side-join提升为map-side-join
·分拆发生数据倾斜的记录,分成几个部分进行,然后合并join后的结果
·改变并行度,可能并行度太少了,导致个别task数据压力大
·两阶段聚合,先局部聚合,再全局聚合
·自定义paritioner,分散key的分布,使其更加均匀
详细解决方案参考博文《Spark数据倾斜优化方法》
48.RDD创建有哪几种方式?
1).使用程序中的集合创建rdd
2).使用本地文件系统创建rdd
3).使用hdfs创建rdd,
4).基于数据库db创建rdd
5).基于Nosql创建rdd,如hbase
6).基于s3创建rdd,
7).基于数据流,如socket创建rdd
如果只回答了前面三种,是不够的,只能说明你的水平还是入门级的,实践过程中有很多种创建方式。
49.Spark并行度怎么设置比较合适
答:spark并行度,每个core承载24个partition,如,32个core,那么64128之间的并行度,也就是
设置64~128个partion,并行读和数据规模无关,只和内存使用量和cpu使用
时间有关
50.Spark中数据的位置是被谁管理的?
答:每个数据分片都对应具体物理位置,数据的位置是被blockManager,无论
数据是在磁盘,内存还是tacyan,都是由blockManager管理
答:Spark中的数据本地性有三种:
a.PROCESS_LOCAL是指读取缓存在本地节点的数据
b.NODE_LOCAL是指读取本地节点硬盘数据
c.ANY是指读取非本地节点数据
通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关,如果RDD经常用的话将该RDD cache到内存中,注意,由于cache是lazy的,所以必须通过一个action的触发,才能真正的将该RDD cache到内存中。
52.rdd有几种操作类型?
1)transformation,rdd由一种转为另一种rdd
2)action,
3)cronroller,crontroller是控制算子,cache,persist,对性能和效率的有很好的支持
三种类型,不要回答只有2中操作
53.Spark如何处理不能被序列化的对象?
将不能序列化的内容封装成object
54.collect功能是什么,其底层是怎么实现的?
答:driver通过collect把集群中各个节点的内容收集过来汇总成结果,collect返回结果是Array类型的,collect把各个节点上的数据抓过来,抓过来数据是Array型,collect对Array抓过来的结果进行合并,合并后Array中只有一个元素,是tuple类型(KV类型的)的。
55.Spaek程序执行,有时候默认为什么会产生很多task,怎么修改默认task执行个数?
答:
1)因为输入数据有很多task,尤其是有很多小文件的时候,有多少个输入block就会有多少个task启动;
2)spark中有partition的概念,每个partition都会对应一个task,task越多,在处理大规模数据的时候,就会越有效率。不过task并不是越多越好,如果平时测试,或者数据量没有那么大,则没有必要task数量太多。
3)参数可以通过spark_home/conf/spark-default.conf配置文件设置:
spark.sql.shuffle.partitions 50 spark.default.parallelism 10
第一个是针对spark sql的task数量
第二个是非spark sql程序设置生效
56.为什么Spark Application在没有获得足够的资源,job就开始执行了,可能会导致什么什么问题发生?
答:会导致执行该job时候集群资源不足,导致执行job结束也没有分配足够的资源,分配了部分Executor,该job就开始执行task,应该是task的调度线程和Executor资源申请是异步的;如果想等待申请完所有的资源再执行job的:需要将spark.scheduler.maxRegisteredResourcesWaitingTime设置的很大;spark.scheduler.minRegisteredResourcesRatio 设置为1,但是应该结合实际考虑
否则很容易出现长时间分配不到资源,job一直不能运行的情况。
57.map与flatMap的区别
map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象
flatMap:对RDD每个元素转换,然后再扁平化
将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组
对象,会抛弃值为null的值
58.列举你常用的action?
collect,reduce,take,count,saveAsTextFile等
59.Spark为什么要持久化,一般什么场景下要进行persist操作?
为什么要进行持久化?
spark所有复杂一点的算法都会有persist身影,spark默认数据放在内存,spark很多内容都是放在内存的,非常适合高速迭代,1000个步骤
只有第一个输入数据,中间不产生临时数据,但分布式系统风险很高,所以容易出错,就要容错,rdd出错或者分片可以根据血统算出来,如果没有对父rdd进行persist 或者cache的化,就需要重头做。
以下场景会使用persist
1)某个步骤计算非常耗时,需要进行persist持久化
2)计算链条非常长,重新恢复要算很多步骤,很好使,persist
3)checkpoint所在的rdd要持久化persist,
lazy级别,框架发现有checnkpoint,checkpoint时单独触发一个job,需要重算一遍,checkpoint前
要持久化,写个rdd.cache或者rdd.persist,将结果保存起来,再写checkpoint操作,这样执行起来会非常快,不需要重新计算rdd链条了。checkpoint之前一定会进行persist。
4)shuffle之后为什么要persist,shuffle要进性网络传输,风险很大,数据丢失重来,恢复代价很大
5)shuffle之前进行persist,框架默认将数据持久化到磁盘,这个是框架自动做的。
60.为什么要进行序列化
序列化可以减少数据的体积,减少存储空间,高效存储和传输数据,不好的是使用的时候要反序列化,非常消耗CPU
61.介绍一下join操作优化经验?
答:join其实常见的就分为两类: map-side join 和 reduce-side join。当大表和小表join时,用map-side join能显著提高效率。将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。
备注:这个题目面试中非常非常大概率见到,务必搜索相关资料掌握,这里抛砖引玉。
62.介绍一下cogroup rdd实现原理,你在什么场景下用过这个rdd?
答:cogroup的函数实现:这个实现根据两个要进行合并的两个RDD操作,生成一个CoGroupedRDD的实例,这个RDD的返回结果是把相同的key中两个RDD分别进行合并操作,最后返回的RDD的value是一个Pair的实例,这个实例包含两个Iterable的值,第一个值表示的是RDD1中相同KEY的值,第二个值表示的是RDD2中相同key的值.由于做cogroup的操作,需要通过partitioner进行重新分区的操作,因此,执行这个流程时,需要执行一次shuffle的操作(如果要进行合并的两个RDD的都已经是shuffle后的rdd,同时他们对应的partitioner相同时,就不需要执行shuffle,),
场景:表关联查询
63下面这段代码输出结果是什么?
def joinRdd(sc:SparkContext) {
val name= Array(
Tuple2(1,"spark"),
Tuple2(2,"tachyon"),
Tuple2(3,"hadoop")
)
val score= Array(
Tuple2(1,100),
Tuple2(2,90),
Tuple2(3,80)
)
val namerdd=sc.parallelize(name);
val scorerdd=sc.parallelize(score);
val result = namerdd.join(scorerdd);
result .collect.foreach(println);
}
--------------------------
答案:
(1,(Spark,100))
(2,(tachyon,90))
(3,(hadoop,80))