目录
[TOC]
一. Hadoop篇
1. 并行计算模型MapReduce
1.1 MapReduce 的工作原理
MapReduce是一个基于集群的计算平台,是一个简化分布式编程的计算框架,是一个将分布式计算抽象为Map和Reduce两个阶段的编程模型。(这句话记住了是可以用来装逼的)
执行步骤:切片>分词>映射>分区>排序>聚合>shuffle>reduce
1)Map()阶段
读取HDFS中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数
重写map(),对第一步产生的<k,v>进行处理,转换为新的<k,v>输出
对输出的key、value进行分区
对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中
2)Reduce阶段
- 多个map任务的输出,按照不同的分区,通过网络复制到不同的reduce节点上
- 对多个map的输出进行合并、排序。
- 重写reduce函数实现自己的逻辑,对输入的key、value处理,转换成新的key、value输出
- 把reduce的输出保存到文件中
特别说明:
切片 不属于map阶段,但却是map阶段的输入,是集群对输入数据的解析处理
分词,映射,分区,排序,聚合 都属map阶段
混洗 横跨map阶段和reduce阶段,其发生在map阶段的输出和reduce的输入阶段
规约 属reduce阶段 规约结果是reduce阶段的输出,输出格式由集群默认或用户自定义
分词即map()函数的输入与map阶段的输入略有差别,他的输入是切片结果的kv形式,行号(偏移量)与行内容
补充
切片:HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。
Map任务的数量:Hadoop为每个split创建一个Map任务,split 的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块
Reduce任务的数量: 最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目 通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)
1.2 MapReduce中shuffle工作流程及优化
shuffle主要功能是把map task的输出结果有效地传送到reduce端。
简单些可以这样说,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
前奏:
1. 在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。Split与block的对应关系可能是多对一,默认是一对一。
2. 在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。前面我们知道这个job有3个reduce task,到底当前的“aaa”应该交由哪个reduce去做呢,是需要现在决定的。
主要工作流程map端分区,排序,溢写,拷贝,reduce端合并
1)Map端shuffle
分区Partition
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
写入内存缓冲区: 在我们的例子中,“aaa”经过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。 这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写,字面意思很直观。
溢写Spill: 这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
排序Sort: 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。
合并Map端:在这里我们可以想想,因为map task的输出是需要发送到不同的reduce端去,而内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种合并应该是体现是磁盘文件中的。从官方图上也可以看到写到磁盘中的溢写文件是对不同的reduce端的数值做过合并。所以溢写过程一个很重要的细节在于,如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。
CombineReduce端:在针对每个reduce端而合并数据时,有些数据可能像这样:“aaa”/1, “aaa”/1。对于WordCount例子,就是简单地统计单词出现的次数,如果在同一个map task的结果中有很多个像“aaa”一样出现多次的key,我们就应该把它们的值合并到一块,这个过程叫reduce也叫combine。但MapReduce的术语中,reduce只指reduce端执行从多个map task取数据做计算的过程。除reduce外,非正式地合并数据只能算做combine了。其实大家知道的,MapReduce中将Combiner等同于Reducer。
2)Reduce端shuffle
1. Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。
2. Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。
增加combiner,压缩溢写的文件。
3. Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。至于怎样才能让这个文件出现在内存中,之后的性能优化篇我再说。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。
3)shuffle优化
压缩:对数据进行压缩,减少写读数据量;
减少不必要的排序:并不是所有类型的Reduce需要的数据都是需要排序的,排序这个nb的过程如果不需要最好还是不要的好;
内存化:Shuffle的数据不放在磁盘而是尽量放在内存中,除非逼不得已往磁盘上放;当然了如果有性能和内存相当的第三方存储系统,那放在第三方存储系统上也是很好的;这个是个大招;
补充
1. Map端
1) io.sort.mb
用于map输出排序的内存缓冲区大小
类型:Int
默认:100mb
备注:如果能估算map输出大小,就可以合理设置该值来尽可能减少溢出写的次数,这对调优很有帮助。
2) io.sort.spill.percent
map输出排序时的spill阀值(即使用比例达到该值时,将缓冲区中的内容spill 到磁盘)
类型:float
默认:0.80
3) io.sort.factor
归并因子(归并时的最多合并的流数),map、reduce阶段都要用到
类型:Int
默认:10
备注:将此值增加到100是很常见的。
4) min.num.spills.for.combine
运行combiner所需的最少溢出写文件数(如果已指定combiner)
类型:Int
默认:3
5) mapred.compress.map.output
map输出是否压缩
类型:Boolean
默认:false
备注:如果map输出的数据量非常大,那么在写入磁盘时压缩数据往往是个很好的主意,因为这样会让写磁盘的速度更快,节约磁盘空间,并且减少传给reducer的数据量。
6) mapred.map.output.compression.codec
用于map输出的压缩编解码器
类型:Classname
默认:org.apache.hadoop.io.compress.DefaultCodec
备注:推荐使用LZO压缩。Intel内部测试表明,相比未压缩,使用LZO压缩的 TeraSort作业,运行时间减少60%,且明显快于Zlib压缩。
7) tasktracker.http.threads
每个tasktracker的工作线程数,用于将map输出到reducer。
(注:这是集群范围的设置,不能由单个作业设置)
类型:Int
默认:40
备注:tasktracker开http服务的线程数。用于reduce拉取map输出数据,大集群可以将其设为40~50。
2. reduce端
1) mapred.reduce.slowstart.completed.maps
调用reduce之前,map必须完成的最少比例
类型:float
默认:0.05
2) mapred.reduce.parallel.copies
reducer在copy阶段同时从mapper上拉取的文件数
类型:int
默认:5
3) mapred.job.shuffle.input.buffer.percent
在shuffle的复制阶段,分配给map输出的缓冲区占堆空间的百分比
类型:float
默认:0.70
4) mapred.job.shuffle.merge.percent
map输出缓冲区(由mapred.job.shuffle.input.buffer.percent定义)使用比例阀值,当达到此阀值,缓冲区中的数据将会被归并然后spill 到磁盘。
类型:float
默认:0.66
5) mapred.inmem.merge.threshold
map输出缓冲区中文件数
类型:int
默认:1000
备注:0或小于0的数意味着没有阀值限制,溢出写将有mapred.job.shuffle.merge.percent单独控制。
6) mapred.job.reduce.input.buffer.percent
在reduce过程中,在内存中保存map输出的空间占整个堆空间的比例。
类型:float
默认:0.0
备注:reduce阶段开始时,内存中的map输出大小不能大于该值。默认情况下,在reduce任务开始之前,所有的map输出都合并到磁盘上,以便为reducer提供尽可能多的内存。然而,如果reducer需要的内存较少,则可以增加此值来最小化访问磁盘的次数,以提高reduce性能。
4. 数据仓库工具Hive
4.1 Hive 和普通关系型数据库的区别
- Hive和关系型数据库存储文件的系统不同, Hive使用的是HDFS(Hadoop的分布式文件系统),关系型数据则是服务器本地的文件系统。
- Hive使用的计算模型是MapReduce,而关系型数据库则是自己设计的计算模型.
- 关系型数据库都是为实时查询业务设计的,而Hive则是为海量数据做挖掘而设计的,实时性差;实时性的区别导致Hive的应用场景和关系型数据库有很大区别。
- Hive很容易扩展自己的存储能力和计算能力,这几是继承Hadoop的,而关系型数据库在这方面要比Hive差很多。
4.2 Hive内部表和外部表的区别
- 创建表时:创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径, 不对数据的位置做任何改变。
- 删除表时:在删除表的时候,内部表的元数据和数据会被一起删除, 而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据。
4.3 Hive分区表和分桶表的区别
分区在HDFS上的表现形式是一个目录, 分桶是一个单独的文件
分区: 细化数据管理,直接读对应目录,缩小mapreduce程序要扫描的数据量
分桶: 1、提高join查询的效率(用分桶字段做连接字段)2、提高采样的效率
4.4 Hive 支持哪些数据格式
可支持Text,SequenceFile,ParquetFile,ORC,RCFILE等
补充
TextFile: TextFile文件不支持块压缩,默认格式,数据不做压缩,磁盘开销大,数据解析开销大。这边不做深入介绍。
RCFile: Record Columnar的缩写。是Hadoop中第一个列文件格式。能够很好的压缩和快速的查询性能,但是不支持模式演进。通常写操作比较慢,比非列形式的文件格式需要更多的内存空间和计算量。 RCFile是一种行列存储相结合的存储方式。首先,其将数据按行分块,保证同一个record在一个块上,避免读一个记录需要读取多个block。其次,块数据列式存储,有利于数据压缩和快速的列存取。
ORCFile: 存储方式:数据按行分块 每块按照列存储 ,压缩快 快速列存取,效率比rcfile高,是rcfile的改良版本,相比RC能够更好的压缩,能够更快的查询,但还是不支持模式演进。
Parquet: Parquet能够很好的压缩,有很好的查询性能,支持有限的模式演进。但是写速度通常比较慢。这中文件格式主要是用在Cloudera Impala上面的。
性能对比
- 读操作
存储效率
4.5 Hive元数据库作用及存储内容
本质上只是用来存储hive中有哪些数据库,哪些表,表的模式,目录,分区,索引以及命名空间。为数据库创建的目录一般在hive数据仓库目录下
4.6 HiveSQL 支持的几种排序区别
1)Order By:全局排序,只有一个Reducer
使用 ORDER BY 子句排序
ASC(ascend): 升序(默认)
DESC(descend): 降序
ORDER BY 子句在SELECT语句的结尾
案例实操
查询员工信息按工资升序排列
hive (default)> select * from emp order by sal;
查询员工信息按工资降序排列
hive (default)> select * from emp order by sal desc;
2)Sort By:每个MapReduce内部排序
Sort By:对于大规模的数据集order by的效率非常低。在很多情况下,并不需要全局排序,此时可以使用sort by。Sort by为每个reducer产生一个排序文件。每个Reducer内部进行排序,对全局结果集来说不是排序。
- 设置reduce个数
hive (default)> set mapreduce.job.reduces=3;
- 查看设置reduce个数
hive (default)> set mapreduce.job.reduces;
- 根据部门编号降序查看员工信息
hive (default)> select * from emp sort by deptno desc;
- 将查询结果导入到文件中(按照部门编号降序排序)
hive (default)> insert overwrite local directory '/opt/module/datas/sortby-result'
select * from emp sort by deptno desc;
3)Distribute By:分区排序
Distribute By: 在有些情况下,我们需要控制某个特定行应该到哪个reducer,通常是为了进行后续的聚集操作。distribute by 子句可以做这件事。distribute by类似MR中partition(自定义分区),进行分区,结合sort by使用。 对于distribute by进行测试,一定要分配多reduce进行处理,否则无法看到distribute by的效果。
案例实操:
- 先按照部门编号分区,再按照员工编号降序排序。
hive (default)> set mapreduce.job.reduces=3;
hive (default)> insert overwrite local directory '/opt/module/datas/distribute-result' select * from emp distribute by deptno sort by empno desc;
注意:
distribute by的分区规则是根据分区字段的hash码与reduce的个数进行模除后,余数相同的分到一个区。
Hive要求DISTRIBUTE BY语句要写在SORT BY语句之前。
4)Cluster By
当distribute by和sorts by字段相同时,可以使用cluster by方式。cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。
- 以下两种写法等价
hive (default)> select * from emp cluster by deptno;
hive (default)> select * from emp distribute by deptno sort by deptno;
--注意:按照部门编号分区,不一定就是固定死的数值,可以是20号和30号部门分到一个分区里面去。
--cluster by :sort by 和 distribute by的组合
4.7 Hive 的动态分区
往hive分区表中插入数据时,如果需要创建的分区很多,比如以表中某个字段进行分区存储,则需要复制粘贴修改很多sql去执行,效率低。因为hive是批处理系统,所以hive提供了一个动态分区功能,其可以基于查询参数的位置去推断分区的名称,从而建立分区。
使用动态分区表必须配置的参数
set hive.exec.dynamic.partition =true
(默认false),表示开启动态分区功能;set hive.exec.dynamic.partition.mode = nonstrict
(默认strict),表示允许所有分区都是动态的,否则必须有静态分区字段;
动态分区相关调优参数
set hive.exec.max.dynamic.partitions.pernode=100
(默认100,一般可以设置大一点,比如1000); 表示每个maper或reducer可以允许创建的最大动态分区个数,默认是100,超出则会报错。set hive.exec.max.dynamic.partitions =1000
(默认值) ; 表示一个动态分区语句可以创建的最大动态分区个数,超出报错;set hive.exec.max.created.files =10000
(默认) 全局可以创建的最大文件个数,超出报错。
4.8 Hive MapJoin
MapJoin是Hive的一种优化操作,其适用于小表JOIN大表的场景,由于表的JOIN操作是在Map端且在内存进行的,所以其并不需要启动Reduce任务也就不需要经过shuffle阶段,从而能在一定程度上节省资源提高JOIN效率
使用
方法一:
在Hive0.11前,必须使用MAPJOIN来标记显示地启动该优化操作,由于其需要将小表加载进内存所以要注意小表的大小
SELECT /*+ MAPJOIN(smalltable)*/ .key,value
FROM smalltable JOIN bigtable ON smalltable.key = bigtable.key
方法二:
在Hive0.11后,Hive默认启动该优化,也就是不在需要显示的使用MAPJOIN标记,其会在必要的时候触发该优化操作将普通JOIN转换成MapJoin,可以通过以下两个属性来设置该优化的触发时机
hive.auto.convert.join
默认值为true,自动开启MAPJOIN优化
hive.mapjoin.smalltable.filesize
默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中
注意:使用默认启动该优化的方式如果出现默名奇妙的BUG(比如MAPJOIN并不起作用),就将以下两个属性置为fase手动使用MAPJOIN标记来启动该优化
hive.auto.convert.join=false(关闭自动MAPJOIN转换操作)
hive.ignore.mapjoin.hint=false(不忽略MAPJOIN标记)
对于以下查询是不支持使用方法二(MAPJOIN标记)来启动该优化的
select /*+MAPJOIN(smallTableTwo)*/ idOne, idTwo, value FROM
( select /*+MAPJOIN(smallTableOne)*/ idOne, idTwo, value FROM
bigTable JOIN smallTableOne on (bigTable.idOne = smallTableOne.idOne)
) firstjoin
JOIN
smallTableTwo ON (firstjoin.idTwo = smallTableTwo.idTwo)
但是,如果使用的是方法一即没有MAPJOIN标记则以上查询语句将会被作为两个MJ执行,进一步的,如果预先知道表大小是能够被加载进内存的,则可以通过以下属性来将两个MJ合并成一个MJ
hive.auto.convert.join.noconditionaltask:Hive在基于输入文件大小的前提下将普通JOIN转换成MapJoin,并是否将多个MJ合并成一个
hive.auto.convert.join.noconditionaltask.size:多个MJ合并成一个MJ时,其表的总的大小须小于该值,同时hive.auto.convert.join.noconditionaltask必须为true
4.9 HQL 和 SQL 有哪些常见的区别
总体一致:Hive-sql与SQL基本上一样,因为当初的设计目的,就是让会SQL不会编程MapReduce的也能使用Hadoop进行处理数据。
区别:Hive没有delete和update。
Hive不支持等值连接
--SQL中对两表内联可以写成: select * from dual a,dual b where a.key = b.key; --Hive中应为 select * from dual a join dual b on a.key = b.key; --而不是传统的格式: SELECT t1.a1 as c1, t2.b1 as c2FROM t1, t2 WHERE t1.a2 = t2.b2
分号字符
--分号是SQL语句结束标记,在HiveQL中也是,但是在HiveQL中,对分号的识别没有那么智慧,例如: select concat(key,concat(';',key)) from dual; --但HiveQL在解析语句时提示: FAILED: Parse Error: line 0:-1 mismatched input '<EOF>' expecting ) in function specification --解决的办法是,使用分号的八进制的ASCII码进行转义,那么上述语句应写成: select concat(key,concat('\073',key)) from dual;
IS [NOT] NULL
--SQL中null代表空值, 值得警惕的是, --在HiveQL中String类型的字段若是空(empty)字符串, 即长度为0, 那么对它进行IS NULL的判断结果是False.
Hive不支持将数据插入现有的表或分区中
hive不支持INSERT INTO 表 Values(), UPDATE, DELETE操作
hive支持嵌入mapreduce程序,来处理复杂的逻辑
如:FROM ( 1. MAP doctext USING 'python wc_mapper.py' AS (word, cnt) 2. FROM docs 3. CLUSTER BY word 4. ) a 5. REDUCE word, cnt USING 'python wc_reduce.py'; --doctext: 是输入 --word, cnt: 是map程序的输出 --CLUSTER BY: 将wordhash后,又作为reduce程序的输入并且map程序、reduce程序可以单独使用,如: 1. FROM ( 2. FROM session_table 3. SELECT sessionid, tstamp, data 4. DISTRIBUTE BY sessionid SORT BY tstamp 5. ) a 6. REDUCE sessionid, tstamp, data USING 'session_reducer.sh'; --DISTRIBUTE BY: 用于给reduce程序分配行数据
4.10 Hive开窗函数
假设有如下表格(loan)。表中包含贷款人的唯一标识,贷款日期,以及贷款金额。
1. SUM(), MIN(),MAX(),AVG()等聚合函数,可以直接使用 over() 进行分区计算。
SELECT *,
/*前三次贷款的金额之和*/
SUM(amount) OVER (PARTITION BY name ORDER BY orderdate ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS pv1,
/*历史所有贷款 累加到下一次贷款 的金额之和*/
SUM(amount) OVER (PARTITION BY name ORDER BY orderdate ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) AS pv2
FROM loan ;
其中,窗口函数over()使得聚合函数sum()可以在限定的窗口中进行聚合。本例子中,第一条语句计算每个人当前记录的前三条贷款金额之和。第二条语句计算截至到下一次贷款,客户贷款的总额。
窗口的限定语法为:ROWS BETWEEN 一个时间点 AND 一个时间点。时间节点可以使用:
- n PRECEDING : 前n行 n preceding
- n FOLLOWING:后n行
- CURRENT ROW : 当前行
如果不想限制具体的行数,可以将 n 替换为 UNBOUNDED.比如从起始到当前,可以写为:
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
窗口函数over()和group by 的最大区别,在于group by之后其余列也必须按照此分区进行计算,而over()函数使得单个特征可以进行分区。
2. NTILE(), ROW_NUMBER(), RANK(), DENSE_RANK(),可以为数据集新增加序列号。
SELECT *,
#将数据按name切分成10区,并返回属于第几个分区
NTILE(10) OVER (PARTITION BY name ORDER BY orderdate) AS f1,
#将数据按照name分区,并按照orderdate排序,返回排序序号
ROW_NUMBER() OVER (PARTITION BY name ORDER BY orderdate) AS f2,
#将数据按照name分区,并按照orderdate排序,返回排序序号
RANK() OVER (PARTITION BY name ORDER BY orderdate) AS f3,
#将数据按照name分区,并按照orderdate排序,返回排序序号
DENSE_RANK() OVER (PARTITION BY name ORDER BY orderdate) AS f4
FROM loan;
其中第一个函数NTILE(10)是将数据按name切分成10区,并返回属于第几个分区。
可以看成是:它把有序的数据集合 平均分配 到 指定的数量(num)个桶中, 将桶号分配给每一行。如果不能平均分配,则优先分配较小编号的桶,并且各个桶中能放的行数最多相差1。
语法是:
ntile (num) over ([partition_clause] order_by_clause) as your_bucket_num然后可以根据桶号,选取前或后 n分之几的数据。
后面的三个函数的功能看起来很相似。区别在于当数据中出现相同值得时候,如何编号。
- ROW_NUMBER()返回的是一列连续的序号。
- RANK()对于数值相同的这一项会标记为相同的序号,而下一个序号跳过。比如{4,5,6}变成了{4,4,6}.
- DENSE_RANK()对于数值相同的这一项,也会标记为相同的序号,但下一个序号并不会跳过。比如{4,5,6}变成了{4,4,5}.
3. LAG(), LEAD(), FIRST_VALUE(), LAST_VALUE()函数返回一系列指定的点
SELECT *,
#取上一笔贷款的日期,缺失默认填NULL
LAG(orderdate, 1) OVER(PARTITION BY name ORDER BY orderdate) AS last_dt,
#取下一笔贷款的日期,缺失指定填'1970-1-1'
LEAD(orderdate, 1,'1970-1-1') OVER(PARTITION BY name ORDER BY orderdate) AS next_dt,
#取最早一笔贷款的日期
FIRST_VALUE(orderdate) OVER(PARTITION BY name ORDER BY orderdate) AS first_dt,
#取新一笔贷款的日期
LAST_VALUE(orderdate) OVER(PARTITION BY name ORDER BY orderdate) AS latest_dt
FROM loan;
LAG(n)将数据向前错位 n 行。LEAD(n)将数据向后错位 n 行。FIRST_VALUE()取当前分区中的第一个值。 LAST_VALUE()取当前分区最后一个值。注意:这四个函数取出的都是某个字段,不是整条记录
4. GROUPING SET(),with CUBE, with ROLLUP 对 group by 进行限制
SELECT
A,B,C
FROM loan
#分别按照月份和日进行分区
GROUP BY substring(orderdate,1,7),orderdate
GROUPING SETS(substring(orderdate,1,7), orderdate)
ORDER BY GROUPING__ID;
GROUPING__ID是GROUPING_SET()的操作之后自动生成的。生成GROUPING__ID是为了区分每条输出结果是属于哪一个group by的数据。它是根据group by后面声明的顺序字段,是否存在于当前group by中的一个二进制位组合数据。GROUPING SETS()必须先做GROUP BY操作。
比如(A,C)的group_id: group_id(A,C) = grouping(A)+grouping(B)+grouping (C) 的结果就是:二进制:101 也就是5.
如果解释器发现group by A,C 但是select A,B,C 那么运行时会将所有from 表取出的结果复制一份,B都置为null,也就是在结果中,B都为null.
SELECT
A,B,C
FROM loan
#分别按照月份和日进行分区
GROUP BY substring(orderdate,1,7),orderdate
with CUBE
ORDER BY GROUPING__ID;
with CUBE 和GROUPING_SET()的区别就是,with CUBE 返回的是group by 字段的笛卡尔积。
SELECT
A,B,C
FROM loan
#分别按照月份和日进行分区
GROUP BY substring(orderdate,1,7),orderdate
with ROLLUP
ORDER BY GROUPING__ID;
with ROLLUP则不会产生第二列为键的聚合结果,在本例子中,只按照 substring(orderdate,1,7)进行展示。所以使用with ROLLUP时,要注意group by 后面字段的顺序。
4.11 HiveUDF UDAF UDTF函数
UDF:一进一出
实现方法:
继承UDF类
重写evaluate方法
将该java文件编译成jar
在终端输入如下命令:
hive> add jar test.jar; hive> create temporary function function_name as 'com.hrj.hive.udf.UDFClass'; hive> select function_name(t.col1) from table t; hive> drop temporary function function_name;
UDAF:多进一出
实现方法:
用户的UDAF必须继承了
org.apache.hadoop.hive.ql.exec.UDAF;
用户的UDAF必须包含至少一个实现了
org.apache.hadoop.hive.ql.exec
的静态类,诸如实现了 UDAFEvaluator一个计算函数必须实现的5个方法的具体含义如下:
- init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。
- iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输 入值合法或者正确计算了,则就返回true。
- terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。
- merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。
- terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。
部分聚集结果的数据类型和最终结果的数据类型可以不同。
UDTF:一进多出
实现方法:
继承
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
initialize():UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)
process:初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward() 调用产生一行;如果产生多列 可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数
最后close()方法调用,对需要清理的方法进行清理
4.12 Hive数据倾斜问题
0. 什么是数据倾斜
对于集群系统,一般缓存是分布式的,即不同节点负责一定范围的缓存数据。我们把缓存数据分散度不够,导致大量的缓存数据集中到了一台或者几台服务节点上,称为数据倾斜。一般来说数据倾斜是由于负载均衡实施的效果不好引起的。
来源百度百科
对于数据计算过程来说,数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
1. 数据倾斜的现象
多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。
2. 数据倾斜的影响
1)数过多的数据在同一个task中执行,将会把executor撑爆,造成OOM,程序终止运行。,据倾斜直接会导致一种情况:Out Of Memory。
2)运行速度慢 ,spark中一个stage的执行时间受限于最后那个执行完的task,因此运行缓慢的任务会拖累整个程序的运行速度(分布式程序运行的速度是由最慢的那个task决定的)。要是发生在Shuffle阶段。同样Key的数据条数太多了。导致了某个key(下图中的80亿条)所在的Task数据量太大了。远远超过其他Task所处理的数据量。
一个经验结论是:一般情况下,OOM的原因都是数据倾斜\
3. 如何定位数据倾斜
数据倾斜一般会发生在shuffle过程中。很大程度上是你使用了可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
原因: 查看任务-》查看Stage-》查看代码
某个task执行特别慢的情况
某个task莫名其妙内存溢出的情况
查看导致数据倾斜的key的数据分布情况
也可从以下几种情况考虑:
1、是不是有OOM情况出现,一般是少数内存溢出的问题
2、是不是应用运行时间差异很大,总体时间很长
3、需要了解你所处理的数据Key的分布情况,如果有些Key有大量的条数,那么就要小心数据倾斜的问题
4、一般需要通过Spark Web UI和其他一些监控方式出现的异常来综合判断
5、看看代码里面是否有一些导致Shuffle的算子出现
4. 数据倾斜的几种典型情况(重点)
- 数据源中的数据分布不均匀,Spark需要频繁交互
- 数据集中的不同Key由于分区方式,导致数据倾斜
- JOIN操作中,一个数据集中的数据分布不均匀,另一个数据集较小(主要)
- 聚合操作中,数据集中的数据分布不均匀(主要)
- JOIN操作中,两个数据集都比较大,其中只有几个Key的数据分布不均匀
- JOIN操作中,两个数据集都比较大,有很多Key的数据分布不均匀
- 数据集中少数几个key数据量很大,不重要,其他数据均匀
注意:
需要处理的数据倾斜问题就是Shuffle后数据的分布是否均匀问题
只要保证最后的结果是正确的,可以采用任何方式来处理数据倾斜,只要保证在处理过程中不发生数据倾斜就可以
5. 数据倾斜的处理方法
发现数据倾斜的时候,不要急于提高executor的资源,修改参数或是修改程序,首先要检查数据本身,是否存在异常数据。
5.1 检查数据,找出异常的key
如果任务长时间卡在最后1个(几个)任务,首先要对key进行抽样分析,判断是哪些key造成的。
选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个
df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)
如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。
经过分析,倾斜的数据主要有以下三种情况:
null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。
无效数据,大量重复的测试数据或是对结果影响不大的有效数据。
有效数据,业务导致的正常数据分布。
解决办法
第1,2种情况,直接对数据进行过滤即可。
第3种情况则需要进行一些特殊操作,常见的有以下几种做法。
隔离执行,将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作。
对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。
使用reduceByKey 代替 groupByKey
使用map join。
举例:
如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作如下:将原始的 key 转化为 key + 随机值(例如Random.nextInt)
对数据进行 reduceByKey(func)
将 key + 随机值 转成 key
再对数据进行 reduceByKey(func)
tip1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。tips2: 单独处理异常数据时,可以配合使用Map Join解决
5.1.1 数据源中的数据分布不均匀,Spark需要频繁交互
解决方案1:避免数据源的数据倾斜
实现原理:通过在Hive中对倾斜的数据进行预处理,以及在进行kafka数据分发时尽量进行平均分配。这种方案从根源上解决了数据倾斜,彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。
方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。
方案缺点:治标不治本,Hive或者Kafka中还是会发生数据倾斜。
适用情况:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。
总结:前台的Java系统和Spark有很频繁的交互,这个时候如果Spark能够在最短的时间内处理数据,往往会给前端有非常好的体验。这个时候可以将数据倾斜的问题抛给数据源端,在数据源端进行数据倾斜的处理。但是这种方案没有真正的处理数据倾斜问题
5.1.2 数据集中的不同Key由于分区方式,导致数据倾斜
解决方案1:调整并行度
实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。
方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。
方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。
实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,都无法处理。
总结:调整并行度:适合于有大量key由于分区算法或者分区数的问题,将key进行了不均匀分区,可以通过调大或者调小分区数来试试是否有效
解决方案2:
缓解数据倾斜**(自定义Partitioner)**
适用场景:大量不同的Key被分配到了相同的Task造成该Task数据量过大。
解决方案: 使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。
优势: 不影响原有的并行度设计。如果改变并行度,后续Stage的并行度也会默认改变,可能会影响后续Stage。
劣势: 适用场景有限,只能将不同Key分散开,对于同一Key对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner,不够灵活。
5.2 检查Spark运行过程相关操作
5.2.1 JOIN操作中,一个数据集中的数据分布不均匀,另一个数据集较小(主要)
解决方案:Reduce side Join转变为Map side Join
方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M),比较适用此方案。
方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。
方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。
方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。
5.2.2 聚合操作中,数据集中的数据分布不均匀(主要)
解决方案:两阶段聚合(局部聚合+全局聚合)
适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案
实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。
优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。
缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案
将相同key的数据分拆处理
5.2.3 JOIN操作中,两个数据集都比较大,其中只有几个Key的数据分布不均匀
解决方案:为倾斜key增加随机前/后缀
适用场景:两张表都比较大,无法使用Map侧Join。其中一个RDD有少数几个Key的数据量过大,另外一个RDD的Key分布较为均匀。
解决方案:将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(笛卡尔积,相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join后去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。
优势:相对于Map侧Join,更能适应大数据集的Join。如果资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提升明显。且只针对倾斜部分的数据做数据扩展,增加的资源消耗有限。
劣势:如果倾斜Key非常多,则另一侧数据膨胀非常大,此方案不适用。而且此时对倾斜Key与非倾斜Key分开处理,需要扫描数据集两遍,增加了开销。
注意:具有倾斜Key的RDD数据集中,key的数量比较少
5.2.4 JOIN操作中,两个数据集都比较大,有很多Key的数据分布不均匀
解决方案:随机前缀和扩容RDD进行join
适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义。
实现思路:将该RDD的每条数据都打上一个n以内的随机前缀。同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。最后将两个处理后的RDD进行join即可。和上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。
优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。
缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。
实践经验:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。
注意:将倾斜Key添加1-N的随机前缀,并将被Join的数据集相应的扩大N倍(需要将1-N数字添加到每一条数据上作为前缀)
5.2.5 数据集中少数几个key数据量很大,不重要,其他数据均匀
解决方案:过滤少数倾斜Key
适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。
优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。
缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。
实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。
4.13 HiveSQL 的优化(系统参数调整、SQL 语句优化)
Hive优化目标
- 在有限的资源下,执行效率更高
常见问题
- 数据倾斜
- map数设置
- reduce数设置
- 其他
Hive执行
HQL –> Job –> Map/Reduce
执行计划
- explain [extended] hql
- 样例
- select col,count(1) from test2 group by col;
- explain select col,count(1) from test2 group by col;
Hive表优化
分区
- set hive.exec.dynamic.partition=true;
- set hive.exec.dynamic.partition.mode=nonstrict;
- 静态分区
- 动态分区
分桶
- set hive.enforce.bucketing=true;
- set hive.enforce.sorting=true;
数据
- 相同数据尽量聚集在一起
Hive Job优化
并行化执行
- 每个查询被hive转化成多个阶段,有些阶段关联性不大,则可以并行化执行,减少执行时间
- set hive.exec.parallel= true;
- set hive.exec.parallel.thread.numbe=8;
本地化执行
- job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
- job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
- job的reduce数必须为0或者1
- set hive.exec.mode.local.auto=true;
- 当一个job满足如下条件才能真正使用本地模式:
job合并输入小文件
- set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
- 合并文件数由mapred.max.split.size限制的大小决定
job合并输出小文件**
- set hive.merge.smallfiles.avgsize=256000000;当输出文件平均小于该值,启动新job合并文件
- set hive.merge.size.per.task=64000000;合并之后的文件大小
JVM重利用
- set mapred.job.reuse.jvm.num.tasks=20;
- JVM重利用可以使得JOB长时间保留slot,直到作业结束,这在对于有较多任务和较多小文件的任务是非常有意义的,减少执行时间。当然这个值不能设置过大,因为有些作业会有reduce任务,如果reduce任务没有完成,则map任务占用的slot不能释放,其他的作业可能就需要等待。
压缩数据
- set hive.exec.compress.output=true;
- set mapred.output.compreession.codec=org.apache.hadoop.io.compress.GzipCodec;
- set mapred.output.compression.type=BLOCK;
- set hive.exec.compress.intermediate=true;
- set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
- set hive.intermediate.compression.type=BLOCK;
- 中间压缩就是处理hive查询的多个job之间的数据,对于中间压缩,最好选择一个节省cpu耗时的压缩方式
- hive查询最终的输出也可以压缩
Hive Map优化
set mapred.map.tasks =10; 无效
(1)默认map个数
- default_num=total_size/block_size;
(2)期望大小
- goal_num=mapred.map.tasks;
(3)设置处理的文件大小
- split_size=max(mapred.min.split.size,block_size);
- split_num=total_size/split_size;
(4)计算的map个数
- compute_map_num=min(split_num,max(default_num,goal_num))
经过以上的分析,在设置map个数的时候,可以简答的总结为以下几点:
- 增大mapred.min.split.size的值
- 如果想增加map个数,则设置mapred.map.tasks为一个较大的值
- 如果想减小map个数,则设置mapred.min.split.size为一个较大的值
- 情况1:输入文件size巨大,但不是小文件
- 情况2:输入文件数量巨大,且都是小文件,就是单个文件的size小于blockSize。这种情况通过增大mapred.min.split.size不可行,需要使用combineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减少mapper的数量。
map端聚合
- set hive.map.aggr=true;
推测执行
- mapred.map.tasks.apeculative.execution
Hive Shuffle优化
Map端
- io.sort.mb
- io.sort.spill.percent
- min.num.spill.for.combine
- io.sort.factor
- io.sort.record.percent
Reduce端
- mapred.reduce.parallel.copies
- mapred.reduce.copy.backoff
- io.sort.factor
- mapred.job.shuffle.input.buffer.percent
- mapred.job.shuffle.input.buffer.percent
- mapred.job.shuffle.input.buffer.percent
Hive Reduce优化
需要reduce操作的查询
- group by,join,distribute by,cluster by…
- order by比较特殊,只需要一个reduce
- sum,count,distinct…
- 聚合函数
- 高级查询
推测执行
- mapred.reduce.tasks.speculative.execution
- hive.mapred.reduce.tasks.speculative.execution
Reduce优化
- numRTasks = min[maxReducers,input.size/perReducer]
- maxReducers=hive.exec.reducers.max
- perReducer = hive.exec.reducers.bytes.per.reducer
- hive.exec.reducers.max 默认 :999
- hive.exec.reducers.bytes.per.reducer 默认:1G
- set mapred.reduce.tasks=10;直接设置
- 计算公式
Hive查询操作优化
join优化
- 关联操作中有一张表非常小
- 不等值的链接操作
- set hive.auto.current.join=true;
- hive.mapjoin.smalltable.filesize默认值是25mb
- select
/*+mapjoin(A)*/
f.a,f.b from A t join B f on (f.a=t.a) - hive.optimize.skewjoin=true;如果是Join过程出现倾斜,应该设置为true
- set hive.skewjoin.key=100000; 这个是join的键对应的记录条数超过这个值则会进行优化
- mapjoin
- 简单总结下,mapjoin的使用场景:
Bucket join
- 两个表以相同方式划分桶
- 两个表的桶个数是倍数关系
- crete table order(cid int,price float) clustered by(cid) into 32 buckets;
- crete table customer(id int,first string) clustered by(id) into 32 buckets;
- select price from order t join customer s on t.cid=s.id
join 优化前
select m.cid,u.id from order m join customer u on m.cid=u.id where m.dt='2013-12-12';
- join优化后
select m.cid,u.id from (select cid from order where dt='2013-12-12')m join customer u on m.cid=u.id;
group by 优化
hive.groupby.skewindata=true;
如果是group by 过程出现倾斜 应该设置为trueset hive.groupby.mapaggr.checkinterval=100000;-
-这个是group的键对应的记录条数超过这个值则会进行优化
count distinct 优化
优化前
select count(distinct id) from tablename
优化后
select count(1) from (select distinct id from tablename) tmp; select count(1) from (select id from tablename group by id) tmp;
优化前
select a,sum(b),count(distinct c),count(distinct d) from test group by a
优化后
select a,sum(b) as b,count(c) as c,count(d) as d from(select a,0 as b,c,null as d from test group by a,c union all select a,0 as b,null as c,d from test group by a,d union all select a,b,null as c,null as d from test)tmp1 group by a;
#二. Spark篇
1. SparkCore
1.1 Spark工作原理
1. Spark是什么
Spark是一种通用分布式并行计算框架。和Mapreduce最大不同就是spark是基于内存的迭代式计算。
Spark的Job处理的中间输出结果可以保存在内存中,从而不再需要读写HDFS,除此之外,一个MapReduce 在计算过程中只有map 和reduce 两个阶段,处理之后就结束了,而在Spark的计算模型中,可以分为n阶段,因为它内存迭代式的,我们在处理完一个阶段以后,可以继续往下处理很多个阶段,而不只是两个阶段。
因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。其不仅实现了MapReduce的算子map 函数和reduce函数及计算模型,还提供更为丰富的算子,如filter、join、groupByKey等。是一个用来实现快速而同用的集群计算的平台。
2. Spark工作原理框图
第一层级
工作流程
a. 构建Spark Application的运行环境(启动SparkContext)
b. SparkContext在初始化过程中分别创建DAGScheduler作业调度和TaskScheduler任务调度两级调度模块
c. SparkContext向资源管理器(可以是Standalone、Mesos、Yarn)申请运行Executor资源;
d. 由资源管理器分配资源并启动StandaloneExecutorBackend,executor,之后向SparkContext申请Task;
e. DAGScheduler将job 划分为多个stage,并将Stage提交给TaskScheduler;
g. Task在Executor上运行,运行完毕释放所有资源。
第二层级
DAGScheduler作业调度生成过程
DAGScheduler是一个面向stage 的作业调度器。
作业调度模块是基于任务阶段的高层调度模块,它为每个Spark作业计算具有依赖关系的多个调度阶段(通常根据shuffle来划分),然后为每个阶段构建出一组具体的任务(通常会考虑数据的本地性等),然后以TaskSets(任务组)的形式提交给任务调度模块来具体执行。
主要三大功能
接受用户提交的job。将job根据类型划分为不同的stage,记录哪些RDD,stage被物化,并在每一个stage内产生一系列的task,并封装成taskset;
决定每个task的最佳位置,任务在数据所在节点上运行,并结合当前的缓存情况,将taskSet提交给TaskScheduler;
重新提交shuffle输出丢失的stage给taskScheduler;
DAG如何将Job划分为多个stage
划分依据:**宽窄依赖**。何时产生宽依赖就会产生一个新的stage,例如reduceByKey,groupByKey,join的算子,会导致宽依赖的产生;一旦遇到宽依赖就划分,然后先提交没有父阶段的stage们,并在提交过程中,计算该stage的task数目以及类型,并提交具体的task,在这些无父阶段的stage提交完之后,依赖该stage 的stage才会提交。
切割规则:从后往前,遇到宽依赖就切割stage;
Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分依据就是宽窄依赖,遇到宽依赖就划分stage,每个stage包含一个或多个task,然后将这些task以taskSet的形式提交给TaskScheduler运行,stage是由一组并行的task组成。
一旦driver程序中出现action,就会生成一个job,比如count等,
向DAGScheduler提交job,如果driver程序后面还有action,那么其他action也会对应生成相应的job,所以,driver端有多少action就会提交多少job,这可能就是为什么spark将driver程序称为application而不是job 的原因。
每一个job可能会包含一个或者多个stage,最后一个stage生成result,在提交job 的过程中,DAGScheduler会首先从后往前划分stage,划分的标准就是宽依赖,一旦遇到宽依赖就划分,然后先提交没有父阶段的stage们,并在提交过程中,计算该stage的task数目以及类型,并提交具体的task,在这些无父阶段的stage提交完之后,依赖该stage 的stage才会提交。
第三层级
谈谈spark中的宽窄依赖
RDD和它的父RDD的关系有两种类型:窄依赖和宽依赖
- 宽依赖:指的是多个子RDD的Partition会依赖同一个父RDD的Partition,关系是一对多,父RDD的一个分区的数据去到子RDD的不同分区里面,会有shuffle的产生
- 窄依赖:指的是每一个父RDD的Partition最多被子RDD的一个partition使用,是一对一的,也就是父RDD的一个分区去到了子RDD的一个分区中,这个过程没有shuffle产生
区分的标准就是看父RDD的一个分区的数据的流向,要是流向一个partition的话就是窄依赖,否则就是宽依赖,如图所示:
1.2 Spark的shuffle原理和过程
Shuffle过程框图
主要逻辑如下:
1)首先每一个MapTask会根据ReduceTask的数量创建出相应的bucket,bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。
2)其次MapTask产生的结果会根据设置的partition算法填充到每个bucket中。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中。
当ReduceTask启动时,它会根据自己task的id和所依赖的Mapper的id从远端或本地的block manager中取得相应的bucket作为Reducer的输入进行处理。
这里的bucket是一个抽象概念,在实现中每个bucket可以对应一个文件,可以对应文件的一部分或是其他等。
Spark shuffle可以分为两部分:Shuffle Write 和 Shuffle Fetch
Shuffle Write
由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。
shuffle write 的任务很简单,那么实现也很简单:将 shuffle write 的处理逻辑加入到 ShuffleMapStage(ShuffleMapTask 所在的 stage) 的最后,该 stage 的 final RDD 每输出一个 record 就将其 partition 并持久化。图示如下:
上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,可以同时运行两个 task。每个 task 的执行结果(该 stage 的 finalRDD 中某个 partition 包含的 records)被逐一写到本地磁盘上。每个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket,其大小为spark.shuffle.file.buffer.kb
,默认是 32KB(Spark 1.1 版本以前是 100KB)。
其实 bucket 是一个广义的概念,代表 ShuffleMapTask 输出结果经过 partition 后要存放的地方,这里为了细化数据存放位置和数据名称,仅仅用 bucket 表示缓冲区。
ShuffleMapTask 的执行过程很简单:先利用 pipeline 计算得到 finalRDD 中对应 partition 的 records。每得到一个 record 就将其送到对应的 bucket 里,具体是哪个 bucket 由partitioner.partition(record.getKey()))
决定。每个 bucket 里面的数据会不断被写到本地磁盘上,形成一个 ShuffleBlockFile,或者简称 FileSegment。之后的 reducer 会去 fetch 属于自己的 FileSegment,进入 shuffle read 阶段。
这样的实现很简单,但有几个问题:
- 产生的 FileSegment 过多。每个 ShuffleMapTask 产生 R(reducer 个数)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。
- 缓冲区占用内存空间大。每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M R 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores R 个(一般 worker 同时可以运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了
cores * R * 32 KB
。对于 8 核 1000 个 reducer 来说,占用内存就是 256MB。
目前来看,第二个问题还没有好的方法解决,因为写磁盘终究是要开缓冲区的,缓冲区太小会影响 IO 速度。但第一个问题有一些方法去解决,下面介绍已经在 Spark 里面实现的 FileConsolidation 方法。先上图:
可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成 ShuffleBlocki,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlocki’,每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。这样,每个 worker 持有的文件数降为 cores * R。FileConsolidation 功能可以通过spark.shuffle.consolidateFiles=true
来开启。
Shuffle Fetch
先看一张包含 ShuffleDependency 的物理执行图,来自 reduceByKey:
很自然地,要计算 ShuffleRDD 中的数据,必须先把 MapPartitionsRDD 中的数据 fetch 过来。那么问题就来了:
- 在什么时候 fetch,parent stage 中的一个 ShuffleMapTask 执行完还是等全部 ShuffleMapTasks 执行完?
- 边 fetch 边处理还是一次性 fetch 完再处理?
- fetch 来的数据存放到哪里?
- 怎么获得要 fetch 的数据的存放位置?
在什么时候 fetch?当 parent stage 的所有 ShuffleMapTasks 结束后再 fetch。理论上讲,一个 ShuffleMapTask 结束后就可以 fetch,但是为了迎合 stage 的概念(即一个 stage 如果其 parent stages 没有执行完,自己是不能被提交执行的),还是选择全部 ShuffleMapTasks 执行完再去 fetch。因为 fetch 来的 FileSegments 要先在内存做缓冲,所以一次 fetch 的 FileSegments 总大小不能太大。Spark 规定这个缓冲界限不能超过
spark.reducer.maxMbInFlight
,这里用 softBuffer 表示,默认大小为 48MB。一个 softBuffer 里面一般包含多个 FileSegment,但如果某个 FileSegment 特别大的话,这一个就可以填满甚至超过 softBuffer 的界限。边 fetch 边处理还是一次性 fetch 完再处理?边 fetch 边处理。本质上,MapReduce shuffle 阶段就是边 fetch 边使用 combine() 进行处理,只是 combine() 处理的是部分数据。MapReduce 为了让进入 reduce() 的 records 有序,必须等到全部数据都 shuffle-sort 后再开始 reduce()。因为 Spark 不要求 shuffle 后的数据全局有序,因此没必要等到全部数据 shuffle 完成后再处理。那么如何实现边 shuffle 边处理,而且流入的 records 是无序的?答案是使用可以 aggregate 的数据结构,比如 HashMap。每 shuffle 得到(从缓冲的 FileSegment 中 deserialize 出来)一个 \ record,直接将其放进 HashMap 里面。如果该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是
func(hashMap.get(Key), Value)
,比如上面 WordCount 例子中的 func 就是hashMap.get(Key) + Value
,并将 func 的结果重新 put(key) 到 HashMap 中去。这个 func 功能上相当于 reduce(),但实际处理数据的方式与 MapReduce reduce() 有差别,差别相当于下面两段程序的差别。// MapReduce reduce(K key, Iterable<V> values) { result = process(key, values) return result } // Spark reduce(K key, Iterable<V> values) { result = null for (V value : values) result = func(result, value) return result }
MapReduce 可以在 process 函数里面可以定义任何数据结构,也可以将部分或全部的 values 都 cache 后再进行处理,非常灵活。而 Spark 中的 func 的输入参数是固定的,一个是上一个 record 的处理结果,另一个是当前读入的 record,它们经过 func 处理后的结果被下一个 record 处理时使用。因此一些算法比如求平均数,在 process 里面很好实现,直接
sum(values)/values.length
,而在 Spark 中 func 可以实现sum(values)
,但不好实现/values.length
。更多的 func 将会在下面的章节细致分析。fetch 来的数据存放到哪里?刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。这里我们主要讨论处理后的数据,可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果
spark.shuffle.spill = false
就只用内存。内存使用的是AppendOnlyMap
,类似 Java 的HashMap
,内存+磁盘使用的是ExternalAppendOnlyMap
,如果内存空间不足时,ExternalAppendOnlyMap
可以将 \ records 进行 sort 后 spill 到磁盘上,等到需要它们的时候再进行归并,后面会详解。使用“内存+磁盘”的一个主要问题就是如何在两者之间取得平衡?在 Hadoop MapReduce 中,默认将 reducer 的 70% 的内存空间用于存放 shuffle 来的数据,等到这个空间利用率达到 66% 的时候就开始 merge-combine()-spill。在 Spark 中,也适用同样的策略,一旦 ExternalAppendOnlyMap 达到一个阈值就开始 spill,具体细节下面会讨论。怎么获得要 fetch 的数据的存放位置?在上一章讨论物理执行图中的 stage 划分的时候,我们强调 “一个 ShuffleMapStage 形成后,会将该 stage 最后一个 final RDD 注册到
MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size)
,这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置”。因此,reducer 在 shuffle 的时候是要去 driver 里面的 MapOutputTrackerMaster 询问 ShuffleMapTask 输出的数据位置的。每个 ShuffleMapTask 完成时会将 FileSegment 的存储位置信息汇报给 MapOutputTrackerMaster。
1.3 Spark的Stage划分及优化
窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为
一个父RDD的分区对应于一个子RDD的分区
两个父RDD的分区对应于一个子RDD 的分区。
宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作
Stage:
一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage。Stage的划分,简单的说是以shuffle和result这两种类型来划分。在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。
比如 rdd.parallize(1 to 10).foreach(println) 这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个;
如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 这个job因为有reduce,所以有一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果job中有多次shuffle,那么每个shuffle之前都是一个stage.
会根据RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中
Stage划分思路
因此spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。
在spark中,Task的类型分为2种:ShuffleMapTask和ResultTask;简单来说,DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!
而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中。
总结
map,filter为窄依赖,
groupbykey为宽依赖
遇到一个宽依赖就分一个stage
1.4 Spark和MapReduce的区别
整体对比概念
Spark Shuffle 与MapReduce Shuffle的设计思想相同,但是实现细节优化方式不同。
1. 从逻辑角度来讲,Shuffle 过程就是一个 GroupByKey 的过程,两者没有本质区别。
只是 MapReduce 为了方便 GroupBy 存在于不同 partition 中的 key/value records,就提前对 key 进行排序。Spark 认为很多应用不需要对 key 排序,就默认没有在 GroupBy 的过程中对 key 排序。2. 从数据流角度讲,两者有差别。
MapReduce 只能从一个 Map Stage shuffle 数据,Spark 可以从多个 Map Stages shuffle 数据3 .Shuffle write/read 实现上有一些区别。
以前对 shuffle write/read 的分类是 sort-based 和 hash-based。MapReduce 可以说是 sort-based,shuffle write 和 shuffle read 过程都是基于key sorting 的 (buffering records + in-memory sort + on-disk external sorting)。早期的 Spark 是 hash-based,shuffle write 和 shuffle read 都使用 HashMap-like 的数据结构进行 aggregate (without key sorting)。但目前的 Spark 是两者的结合体,shuffle write 可以是 sort-based (only sort partition id, without key sorting),shuffle read 阶段可以是 hash-based。因此,目前 sort-based 和 hash-based 已经“你中有我,我中有你”,界限已经不那么清晰。4. 从数据 fetch 与数据计算的重叠粒度来讲,两者有细微区别。
MapReduce 是粗粒度,reducer fetch 到的 records 先被放到shuffle buffer
中休息,当 shuffle buffer 快满时,才对它们进行 combine()。而 Spark 是细粒度,可以即时将 fetch 到的 record 与 HashMap 中相同 key 的 record 进行 aggregate。
解说:
1、MapReduce在Map阶段完成之后数据会被写入到内存中的一个环形缓冲区(后续的分区/分组/排序在这里完成);Spark的Map阶段完成之后直接输出到磁盘。
2、受第一步的影响,MapReduce输出的数据是有序的(针对单个Map数据来说);Spark的数据是无序的(可以使用RDD算子达到排序的效果)。
3、MapReduce缓冲区的数据处理完之后会spill到磁盘形成一个文件,文件数量达到阈值之后将会进行merge操作,将多个小文件合并为一个大文件;Spark没有merge过程,一个Map中如果有对应多个Reduce的数据,则直接写多个磁盘文件。
4、MapReduce全部通过网络来获得数据;对于本地数据Spark可以直接读取
1.5 宽依赖与窄依赖区别
RDD和它的父RDD的关系有两种类型:窄依赖和宽依赖
- 宽依赖:指的是多个子RDD的Partition会依赖同一个父RDD的Partition,关系是一对多,父RDD的一个分区的数据去到子RDD的不同分区里面,会有shuffle的产生
- 窄依赖:指的是每一个父RDD的Partition最多被子RDD的一个partition使用,是一对一的,也就是父RDD的一个分区去到了子RDD的一个分区中,这个过程没有shuffle产生
区分的标准就是看父RDD的一个分区的数据的流向,要是流向一个partition的话就是窄依赖,否则就是宽依赖,如图所示:
1.6 Spark RDD 原理
1. RDD是什么
RDD(Resilient Distributed Dataset)叫做分布式数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可以并行计算的集合
- Dataset:就是一个集合,用于存放数据的
- Destributed:分布式,可以并行在集群计算
- Resilient:表示弹性的,弹性表示
- RDD中的数据可以存储在内存或者磁盘中;
- RDD中的分区是可以改变的;
- A list of partitions:一个分区列表,RDD中的数据都存储在一个分区列表中
- A function for computing each split:作用在每一个分区中的函数
- A list of dependencies on other RDDs:一个RDD依赖于其他多个RDD,这个点很重要,RDD的容错机制就是依据这个特性而来的
- Optionally,a Partitioner for key-value RDDs(eg:to say that the RDD is hash-partitioned):可选的,针对于kv类型的RDD才有这个特性,作用是决定了数据的来源以及数据处理后的去向
- 可选项,数据本地性,数据位置最优
2. RDD操作
RDD创建后就可以在RDD上进行数据处理。RDD支持两种操作:转换(transformation),即从现有的数据集创建一个新的数据集;动作(action),即在数据集上进行计算后,返回一个值给Driver程序。
- 转换(transformation)
RDD 的转化操作是返回一个新的 RDD 的操作,比如 map() 和 filter() ,而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first() 。Spark 对待转化操作和行动操作的方式很不一样,因此理解你正在进行的操作的类型是很重要的。如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。
RDD中所有的Transformation都是惰性的,也就是说,它们并不会直接计算结果。相反的它们只是记住了这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的Action时,这些Transformation才会真正运行。
#### map(func)**
返回一个新的分布式数据集,该数据集由每一个输入元素经过func函数转换后组成
#### **fitler(func)**
返回一个新的数据集,该数据集由经过func函数计算后返回值为true的输入元素组成
#### **flatMap(func)**
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func返回一个序列,而不是单一元素)
#### **mapPartitions(func)**
类似于map,但独立地在RDD上每一个分片上运行,因此在类型为T的RDD上运行时,func函数类型必须是Iterator[T]=>Iterator[U]
#### **mapPartitionsWithSplit(func)**
类似于mapPartitons,但func带有一个整数参数表示分片的索引值。因此在类型为T的RDD上运行时,func函数类型必须是(Int,Iterator[T])=>Iterator[U]
#### **sample(withReplacement,fraction,seed)**
根据fraction指定的比例对数据进行采样,可以选择是否用随机数进行替换,seed用于随机数生成器种子
#### **union(otherDataSet)**
返回一个新数据集,新数据集是由原数据集和参数数据集联合而成
#### **distinct([numTasks])**
返回一个包含原数据集中所有不重复元素的新数据集
#### **groupByKey([numTasks])**
在一个(K,V)数据集上调用,返回一个(K,Seq[V])对的数据集。注意默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数来改变它
#### **reduceByKey(func,[numTasks])**
在一个(K,V)对的数据集上调用,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同的key的值聚合到一起。与groupByKey类似,reduceByKey任务的个数是可以通过第二个可选参数来设置的
#### **sortByKey([[ascending],numTasks])**
在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定
#### **join(otherDataset0,[numTasks])**
在类型为(K,V)和(K,W)数据集上调用,返回一个相同的key对应的所有元素在一起的(K,(V,W))数据集
#### **cogroup(otherDataset,[numTasks])**
在类型为(K,V)和(K,W)数据集上调用,返回一个(K,Seq[V],Seq[W])元祖的数据集。这个操作也可以称为groupwith
#### **cartesain(ohterDataset)**
笛卡尔积,在类型为T和U类型的数据集上调用,返回一个(T,U)对数据集(两两的元素对)
- 动作(action)
#### **reduce(func)**
通过函数func(接收两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的并行运行
#### **collect()**
在驱动程序中,以数组形式返回数据集中的所有元素。通常在使用filter或者其他操作返回一个足够小的数据子集后再使用会比较有用
#### **count()**
返回数据集元素个数
#### **first()**
返回数据集第一个元素(类似于take(1))
#### **take(n)**
返回一个由数据集前n个元素组成的数组
注意 这个操作目前并非并行执行,而是由驱动程序计算所有的元素
#### **takeSample(withReplacement,num,seed)**
返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否由随机数替换不足的部分,seed用户指定随机数生成器种子
#### **saveAsTextFile(path)**
将数据集的元素以textfile的形式保存到本地文件系统—HDFS或者任何其他Hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行
#### **saveAsSequenceFile(path)**
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是本地系统、HDFS或者任何其他的Hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者可以隐式的转换为Writable的RDD(Spark包括了基本类型转换,例如Int、Double、String等)
#### **countByKey()**
对(K,V)类型的RDD有效,返回一个(K,Int)对的map,表示每一个key对应的元素个数
#### **foreach(func)**
在数据集的每一个元素上,运行函数func进行更新。通常用于边缘效果,例如更新一个叠加器,或者和外部存储系统进行交互,如HBase
3. RDD共享变量
在应用开发中,一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所有变量的独立拷贝。这些变量会被拷贝到每一台机器。通常看来,在任务之间中,读写共享变量显然不够高效。然而,Spark还是为两种常见的使用模式,提供了两种有限的共享变量:广播变量和累加器。
(1). 广播变量(Broadcast Variables)
– 广播变量缓存到各个节点的内存中,而不是每个 Task
– 广播变量被创建后,能在集群中运行的任何函数调用
– 广播变量是只读的,不能在被广播后修改
– 对于大数据集的广播, Spark 尝试使用高效的广播算法来降低通信成本
val broadcastVar = sc.broadcast(Array(1, 2, 3))方法参数中是要广播的变量
(2). 累加器
累加器只支持加法操作,可以高效地并行,用于实现计数器和变量求和。Spark 原生支持数值类型和标准可变集合的计数器,但用户可以添加新的类型。只有驱动程序才能获取累加器的值。
4. RDD缓存
Spark可以使用 persist 和 cache 方法将任意 RDD 缓存到内存、磁盘文件系统中。缓存是容错的,如果一个 RDD 分片丢失,可以通过构建它的 transformation自动重构。被缓存的 RDD 被使用的时,存取速度会被大大加速。一般的executor内存60%做 cache, 剩下的40%做task。
Spark中,RDD类可以使用cache() 和 persist() 方法来缓存。cache()是persist()的特例,将该RDD缓存到内存中。而persist可以指定一个StorageLevel。StorageLevel的列表可以在StorageLevel 伴生单例对象中找到。
Spark的不同StorageLevel ,目的满足内存使用和CPU效率权衡上的不同需求。我们建议通过以下的步骤来进行选择:
如果你的RDDs可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDDs的操作尽可能的快。
如果不行,试着使用MEMORY_ONLY_SER并且选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。
尽可能不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快。
如果你想有快速故障恢复能力,使用复制存储级别(例如:用Spark来响应web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续的运行任务,而不需要等待丢失的分区被重新计算。
如果你想要定义你自己的存储级别(比如复制因子为3而不是2),可以使用StorageLevel 单例对象的apply()方法。
在不会使用cached RDD的时候,及时使用unpersist方法来释放它。
1.7 RDD有哪几种创建方式
1) 使用程序中的集合创建rdd
2) 使用本地文件系统创建rdd
3) 使用hdfs创建rdd,
4) 基于数据库db创建rdd
5) 基于Nosql创建rdd,如hbase
6) 基于s3创建rdd,
7) 基于数据流,如socket创建rdd
1.8 Spark的RDD DataFrame和DataSet的区别
RDD的优点:
- 相比于传统的MapReduce框架,Spark在RDD中内置很多函数操作,group,map,filter等,方便处理结构化或非结构化数据。
- 面向对象编程,直接存储的java对象,类型转化也安全
RDD的缺点:
- 由于它基本和hadoop一样万能的,因此没有针对特殊场景的优化,比如对于结构化数据处理相对于sql来比非常麻烦
- 默认采用的是java序列号方式,序列化结果比较大,而且数据存储在java堆内存中,导致gc比较频繁
DataFrame的优点:
结构化数据处理非常方便,支持Avro, CSV, elastic search, and Cassandra等kv数据,也支持HIVE tables, MySQL等传统数据表
有针对性的优化,如采用Kryo序列化,由于数据结构元信息spark已经保存,序列化时不需要带上元信息,大大的减少了序列化大小,而且数据保存在堆外内存中,减少了gc次数,所以运行更快。
hive兼容,支持hql、udf等
DataFrame的缺点:
- 编译时不能类型转化安全检查,运行时才能确定是否有问题
- 对于对象支持不友好,rdd内部数据直接以java对象存储,dataframe内存存储的是row对象而不能是自定义对象
DateSet的优点:
DateSet整合了RDD和DataFrame的优点,支持结构化和非结构化数据
和RDD一样,支持自定义对象存储
和DataFrame一样,支持结构化数据的sql查询
采用堆外内存存储,gc友好
类型转化安全,代码友好
如此回答有3个坑(容易引起面试官追问):
1)Spark shuffle 与 MapReduce shuffle(或者Spark 与 MR 的区别)
2)Spark内存模型
3)对gc(垃圾回收)的了解
1.10 Spark 的通信机制
分布式的通信方式
- RPC
- RMI
- JMS
- EJB
- Web Serivice
通信框架Akka
Hadoop MR中的计算框架,jobTracker和TaskTracker间是由于通过heartbeat的方式来进行的通信和传递数据,会导致非常慢的执行速度,而Spark具有出色的高效的Akka和netty通信系统
1.11 Spark的数据容错机制
一般而言,对于分布式系统,数据集的容错性通常有两种方式:
1) 数据检查点(在Spark中对应Checkpoint机制)。
2) 记录数据的更新(在Spark中对应Lineage血统机制)。
对于大数据分析而言,数据检查点操作成本较高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低,同时会消耗大量存储资源。
Spark选择记录更新的方式。但更新粒度过细时,记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建RDD的一系列变换序列记录下来,以便恢复丢失的分区。
Lineage(血统)机制
每个RDD除了包含分区信息外,还包含它从父辈RDD变换过来的步骤,以及如何重建某一块数据的信息,因此RDD的这种容错机制又称“血统”(Lineage)容错。Lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做以便恢复数据。
相比其他系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据Transformation操作(如filter、map、join等)。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新计算和恢复丢失的数据分区。但这种数据模型粒度较粗,因此限制了Spark的应用场景。所以可以说Spark并不适用于所有高性能要求的场景,但同时相比细颗粒度的数据模型,也带来了性能方面的提升。
RDD在Lineage容错方面采用如下两种依赖来保证容错方面的性能:
窄依赖(Narrow Dependeny):窄依赖是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区。也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。其中,1个父RDD分区对应1个子RDD分区,可以分为如下两种情况:
子RDD分区与父RDD分区一一对应(如map、filter等算子)。一个子RDD分区对应N个父RDD分区(如co-paritioned(协同划分)过的Join)。
宽依赖(Wide Dependency,源码中称为Shuffle Dependency):
宽依赖是指一个父RDD分区对应多个子RDD分区,可以分为如下两种情况:
一个父RDD对应所有子RDD分区(未经协同划分的Join)。
一个父RDD对应多个RDD分区(非全部分区)(如groupByKey)。
窄依赖与宽依赖关系如图3-10所示。
从图3-10可以看出对依赖类型的划分:根据父RDD分区是对应一个还是多个子RDD分区来区分窄依赖(父分区对应一个子分区)和宽依赖(父分区对应多个子分区)。如果对应多个,则当容错重算分区时,对于需要重新计算的子分区而言,只需要父分区的一部分数据,因此其余数据的重算就导致了冗余计算。
图3-10 两种依赖关系
对于宽依赖,Stage计算的输入和输出在不同的节点上,对于输入节点完好,而输出节点死机的情况,在通过重新计算恢复数据的情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上追溯其祖先看是否可以重试(这就是lineage,血统的意思),窄依赖对于数据的重算开销要远小于宽依赖的数据重算开销。
窄依赖和宽依赖的概念主要用在两个地方:一个是容错中相当于Redo日志的功能;另一个是在调度中构建DAG作为不同Stage的划分点(前面调度机制中已讲过)。
依赖关系在lineage容错中的应用总结如下:
1)窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成,并且父RDD的计算结果进行hash并传到对应节点上之后,才能计算子RDD。
2)数据丢失时,对于窄依赖,只需要重新计算丢失的那一块数据来恢复;对于宽依赖,则要将祖先RDD中的所有数据块全部重新计算来恢复。所以在长“血统”链特别是有宽依赖时,需要在适当的时机设置数据检查点(checkpoint机制在下节讲述)。可见Spark在容错性方面要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。
在Spark容错机制中,如果一个节点宕机了,而且运算属于窄依赖,则只要重算丢失的父RDD分区即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。更深入地来说:在窄依赖关系中,当子RDD的分区丢失,重算其父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,因此不存在冗余计算。而在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区使用,其中有一部分数据对应的是其他不需要重新计算的子RDD分区中的数据,因此在宽依赖关系下,这样计算就会产生冗余开销,这也是宽依赖开销更大的原因。为了减少这种冗余开销,通常在Lineage血统链比较长,并且含有宽依赖关系的容错中使用Checkpoint机制设置检查点。
Checkpoint(检查点)机制
通过上述分析可以看出Checkpoint的本质是将RDD写入Disk来作为检查点。这种做法是为了通过lineage血统做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。
1.13 Spark性能调优
1) 常用参数说明
--driver-memory 4g : driver内存大小,一般没有广播变量(broadcast)时,设置4g足够,如果有广播变量,视情况而定,可设置6G,8G,12G等均可
--executor-memory 4g : 每个executor的内存,正常情况下是4g足够,但有时处理大批量数据时容易内存不足,再多申请一点,如6G
--num-executors 15 : 总共申请的executor数目,普通任务十几个或者几十个足够了,若是处理海量数据如百G上T的数据时可以申请多一些,100,200等
--executor-cores 2 : 每个executor内的核数,即每个executor中的任务task数目,此处设置为2,即2个task共享上面设置的6g内存,每个map或reduce任务的并行度是executor数目*executor中的任务数
yarn集群中一般有资源申请上限,如,executor-memory*num-executors < 400G 等,所以调试参数时要注意这一点
—-spark.default.parallelism 200 : Spark作业的默认为500~1000个比较合适,如果不设置,spark会根据底层HDFS的block数量设置task的数量,这样会导致并行度偏少,资源利用不充分。该参数设为num-executors * executor-cores的2~3倍比较合适。
-- spark.storage.memoryFraction 0.6 : 设置RDD持久化数据在Executor内存中能占的最大比例。默认值是0.6
—-spark.shuffle.memoryFraction 0.2 : 设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2,如果shuffle聚合时使用的内存超出了这个20%的限制,多余数据会被溢写到磁盘文件中去,降低shuffle性能
—-spark.yarn.executor.memoryOverhead 1G : executor执行的时候,用的内存可能会超过executor-memory,所以会为executor额外预留一部分内存,spark.yarn.executor.memoryOverhead即代表这部分内存
2) Spark常用编程建议
避免创建重复的RDD,尽量复用同一份数据。
尽量避免使用shuffle类算子,因为shuffle操作是spark中最消耗性能的地方,reduceByKey、join、distinct、repartition等算子都会触发shuffle操作,尽量使用map类的非shuffle算子
用aggregateByKey和reduceByKey替代groupByKey,因为前两个是预聚合操作,会在每个节点本地对相同的key做聚合,等其他节点拉取所有节点上相同的key时,会大大减少磁盘IO以及网络开销。
repartition适用于RDD[V], partitionBy适用于RDD[K, V]
mapPartitions操作替代普通map,foreachPartitions替代foreach
filter操作之后进行coalesce操作,可以减少RDD的partition数量
如果有RDD复用,尤其是该RDD需要花费比较长的时间,建议对该RDD做cache,若该RDD每个partition需要消耗很多内存,建议开启Kryo序列化机制(据说可节省2到5倍空间),若还是有比较大的内存开销,可将storage_level设置为MEMORY_AND_DISK_SER
尽量避免在一个Transformation中处理所有的逻辑,尽量分解成map、filter之类的操作
多个RDD进行union操作时,避免使用rdd.union(rdd).union(rdd).union(rdd)这种多重union,rdd.union只适合2个RDD合并,合并多个时采用SparkContext.union(Array(RDD)),避免union嵌套层数太多,导致的调用链路太长,耗时太久,且容易引发StackOverFlow
spark中的Group/join/XXXByKey等操作,都可以指定partition的个数,不需要额外使用repartition和partitionBy函数
尽量保证每轮Stage里每个task处理的数据量>128M
如果2个RDD做join,其中一个数据量很小,可以采用Broadcast Join,将小的RDD数据collect到driver内存中,将其BroadCast到另外以RDD中,其他场景想优化后面会讲
2个RDD做笛卡尔积时,把小的RDD作为参数传入,如BigRDD.certesian(smallRDD)
若需要Broadcast一个大的对象到远端作为字典查询,可使用多executor-cores,大executor-memory。若将该占用内存较大的对象存储到外部系统,executor-cores=1, executor-memory=m(默认值2g),可以正常运行,那么当大字典占用空间为size(g)时,executor-memory为2*size,executor-cores=size/m(向上取整)
如果对象太大无法BroadCast到远端,且需求是根据大的RDD中的key去索引小RDD中的key,可使用zipPartitions以hash join的方式实现,具体原理参考下一节的shuffle过程
如果需要在repartition重分区之后还要进行排序,可直接使用repartitionAndSortWithinPartitions,比分解操作效率高,因为它可以一边shuffle一边排序
3) shuffle性能优化
3.1 什么是shuffle操作
spark中的shuffle操作功能:将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join操作,类似洗牌的操作。这些分布在各个存储节点上的数据重新打乱然后汇聚到不同节点的过程就是shuffle过程。
3.2 哪些操作中包含shuffle操作
RDD的特性是不可变的带分区的记录集合,Spark提供了Transformation和Action两种操作RDD的方式。Transformation是生成新的RDD,包括map, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, mapValues等;Action只是返回一个结果,包括collect,reduce,count,save,lookupKey等
Spark所有的算子操作中是否使用shuffle过程要看计算后对应多少分区:
- 若一个操作执行过程中,结果RDD的每个分区只依赖上一个RDD的同一个分区,即属于窄依赖,如map、filter、union等操作,这种情况是不需要进行shuffle的,同时还可以按照pipeline的方式,把一个分区上的多个操作放在同一个Task中进行
- 若结果RDD的每个分区需要依赖上一个RDD的全部分区,即属于宽依赖,如repartition相关操作(repartition,coalesce)、*ByKey操作(groupByKey,ReduceByKey,combineByKey、aggregateByKey等)、join相关操作(cogroup,join)、distinct操作,这种依赖是需要进行shuffle操作的
3.3 shuffle操作过程
shuffle过程分为shuffle write和shuffle read两部分
- shuffle write: 分区数由上一阶段的RDD分区数控制,shuffle write过程主要是将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上(当前stage结束之后,每个task处理的数据按key进行分类,数据先写入内存缓冲区,缓冲区满,溢写spill到磁盘文件,最终相同key被写入同一个磁盘文件)创建的磁盘文件数量=当前stage中task数量*下一个stage的task数量
- shuffle read:从上游stage的所有task节点上拉取属于自己的磁盘文件,每个read task会有自己的buffer缓冲,每次只能拉取与buffer缓冲相同大小的数据,然后聚合,聚合完一批后拉取下一批,边拉取边聚合。分区数由Spark提供的一些参数控制,如果这个参数值设置的很小,同时shuffle read的数据量很大,会导致一个task需要处理的数据非常大,容易发生JVM crash,从而导致shuffle数据失败,同时executor也丢失了,就会看到Failed to connect to host 的错误(即executor lost)。
shuffle过程中,各个节点会通过shuffle write过程将相同key都会先写入本地磁盘文件中,然后其他节点的shuffle read过程通过网络传输拉取各个节点上的磁盘文件中的相同key。这其中大量数据交换涉及到的网络传输和文件读写操作是shuffle操作十分耗时的根本原因
3.4 spark的shuffle类型
参数spark.shuffle.manager用于设置ShuffleManager的类型。Spark1.5以后,该参数有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark1.2以前的默认值,Spark1.2之后的默认值都是SortShuffleManager。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
由于SortShuffleManager默认会对数据进行排序,因此如果业务需求中需要排序的话,使用默认的SortShuffleManager就可以;但如果不需要排序,可以通过bypass机制或设置HashShuffleManager避免排序,同时也能提供较好的磁盘读写性能。
HashShuffleManager流程:
SortShuffleManager流程:
3.5 如何开启bypass机制
bypass机制通过参数spark.shuffle.sort.bypassMergeThreshold设置,默认值是200,表示当ShuffleManager是SortShuffleManager时,若shuffle read task的数量小于这个阈值(默认200)时,则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式写数据,但最后会将每个task产生的所有临时磁盘文件合并成一个文件,并创建索引文件。
这里给出的调优建议是,当使用SortShuffleManager时,如果的确不需要排序,可以将这个参数值调大一些,大于shuffle read task的数量。那么此时就会自动开启bypass机制,map-side就不会进行排序了,减少排序的性能开销,提升shuffle操作效率。但这种方式并没有减少shuffle write过程产生的磁盘文件数量,所以写的性能没有改变。
3.6 HashShuffleManager优化建议
如果使用HashShuffleManager,可以设置spark.shuffle.consolidateFiles参数。该参数默认为false,只有当使用HashShuffleManager且该参数设置为True时,才会开启consolidate机制,大幅度合并shuffle write过程产生的输出文件,对于shuffle read task 数量特别多的情况下,可以极大地减少磁盘IO开销,提升shuffle性能。参考社区同学给出的数据,consolidate性能比开启bypass机制的SortShuffleManager高出10% ~ 30%。
3.7 shuffle调优建议
除了上述的几个参数调优,shuffle过程还有一些参数可以提高性能:
- spark.shuffle.file.buffer : 默认32M,shuffle Write阶段写文件时的buffer大小,若内存资源比较充足,可适当将其值调大一些(如64M),减少executor的IO读写次数,提高shuffle性能
- spark.shuffle.io.maxRetries : 默认3次,Shuffle Read阶段取数据的重试次数,若shuffle处理的数据量很大,可适当将该参数调大。
3.8 shuffle操作过程中的常见错误
SparkSQL中的shuffle错误:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
org.apache.spark.shuffle.FetchFailedException:Failed to connect to hostname/192.168.xx.xxx:50268
RDD中的shuffle错误:
WARN TaskSetManager: Lost task 17.1 in stage 4.1 (TID 1386, spark050013): java.io.FileNotFoundException: /data04/spark/tmp/blockmgr-817d372f-c359-4a00-96dd-8f6554aa19cd/2f/temp_shuffle_e22e013a-5392-4edb-9874-a196a1dad97c (没有那个文件或目录)
FetchFailed(BlockManagerId(6083b277-119a-49e8-8a49-3539690a2a3f-S155, spark050013, 8533), shuffleId=1, mapId=143, reduceId=3, message=
org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/data04/spark/tmp/blockmgr-817d372f-c359-4a00-96dd-8f6554aa19cd/0e/shuffle_1_143_0.data, offset=997061, length=112503}
处理shuffle类操作的注意事项:
- 减少shuffle数据量:在shuffle前过滤掉不必要的数据,只选取需要的字段处理
- 针对SparkSQL和DataFrame的join、group by等操作:可以通过 spark.sql.shuffle.partitions控制分区数,默认设置为200,可根据shuffle的量以及计算的复杂度提高这个值,如2000等
- RDD的join、group by、reduceByKey等操作:通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core总数,官方建议为设置成运行任务的core的2~3倍
- 提高executor的内存:即spark.executor.memory的值
- 分析数据验证是否存在数据倾斜的问题:如空值如何处理,异常数据(某个key对应的数据量特别大)时是否可以单独处理,可以考虑自定义数据分区规则,如何自定义可以参考下面的join优化环节
4) join性能优化
Spark所有的操作中,join操作是最复杂、代价最大的操作,也是大部分业务场景的性能瓶颈所在。所以针对join操作的优化是使用spark必须要学会的技能。
spark的join操作也分为Spark SQL的join和Spark RDD的join。
4.1 Spark SQL 的join操作
4.1.1 Hash Join
Hash Join的执行方式是先将小表映射成Hash Table的方式,再将大表使用相同方式映射到Hash Table,在同一个hash分区内做join匹配。
hash join又分为broadcast hash join和shuffle hash join两种。其中Broadcast hash join,顾名思义,就是把小表广播到每一个节点上的内存中,大表按Key保存到各个分区中,小表和每个分区的大表做join匹配。这种情况适合一个小表和一个大表做join且小表能够在内存中保存的情况。如下图所示:
当Hash Join不能适用的场景就需要Shuffle Hash Join了,Shuffle Hash Join的原理是按照join Key分区,key相同的数据必然分配到同一分区中,将大表join分而治之,变成小表的join,可以提高并行度。执行过程也分为两个阶段:
- shuffle阶段:分别将两个表按照join key进行分区,将相同的join key数据重分区到同一节点
- hash join阶段:每个分区节点上的数据单独执行单机hash join算法
Shuffle Hash Join的过程如下图所示:
4.1.2 Sort-Merge Join
SparkSQL针对两张大表join的情况提供了全新的算法——Sort-merge join,整个过程分为三个步骤:
- Shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式进行处理
- sort阶段:对单个分区节点的两表数据,分别进行排序
- merge阶段:对排好序的两张分区表数据执行join操作。分别遍历两个有序序列,遇到相同的join key就merge输出,否则继续取更小一边的key,即合并两个有序列表的方式。
sort-merge join流程如下图所示。
4.2 Spark RDD的join操作
Spark的RDD join没有上面这么多的分类,但是面临的业务需求是一样的。如果是大表join小表的情况,则可以将小表声明为broadcast变量,使用map操作快速实现join功能,但又不必执行Spark core中的join操作。
如果是两个大表join,则必须依赖Spark Core中的join操作了。Spark RDD Join的过程可以自行阅读源码了解,这里只做一个大概的讲解。
spark的join过程中最核心的函数是cogroup方法,这个方法中会判断join的两个RDD所使用的partitioner是否一样,如果分区相同,即存在OneToOneDependency依赖,不用进行hash分区,可直接join;如果要关联的RDD和当前RDD的分区不一致时,就要对RDD进行重新hash分区,分到正确的分区中,即存在ShuffleDependency,需要先进行shuffle操作再join。因此提升join效率的一个思路就是使得两个RDD具有相同的partitioners。
所以针对Spark RDD的join操作的优化建议是:
- 如果需要join的其中一个RDD比较小,可以直接将其存入内存,使用broadcast hash join
- 在对两个RDD进行join操作之前,使其使用同一个partitioners,避免join操作的shuffle过程
- 如果两个RDD其一存在重复的key也会导致join操作性能变低,因此最好先进行key值的去重处理
4.3 数据倾斜优化
均匀数据分布的情况下,前面所说的优化建议就足够了。但存在数据倾斜时,仍然会有性能问题。主要体现在绝大多数task执行得都非常快,个别task执行很慢,拖慢整个任务的执行进程,甚至可能因为某个task处理的数据量过大而爆出OOM错误。
shuffle操作中需要将各个节点上相同的key拉取到某一个节点上的一个task处理,如果某个key对应的数据量特别大,就会发生数据倾斜。
4.3.1 分析数据分布
如果是Spark SQL中的group by、join语句导致的数据倾斜,可以使用SQL分析执行SQL中的表的key分布情况;如果是Spark RDD执行shuffle算子导致的数据倾斜,可以在Spark作业中加入分析Key分布的代码,使用countByKey()统计各个key对应的记录数。
4.3.2 数据倾斜的解决方案
这里参考美团技术博客中给出的几个方案。
1)针对hive表中的数据倾斜,可以尝试通过hive进行数据预处理,如按照key进行聚合,或是和其他表join,Spark作业中直接使用预处理后的数据。
2)如果发现导致倾斜的key就几个,而且对计算本身的影响不大,可以考虑过滤掉少数导致倾斜的key
3)设置参数spark.sql.shuffle.partitions,提高shuffle操作的并行度,增加shuffle read task的数量,降低每个task处理的数据量
4)针对RDD执行reduceByKey等聚合类算子或是在Spark SQL中使用group by语句时,可以考虑两阶段聚合方案,即局部聚合+全局聚合。第一阶段局部聚合,先给每个key打上一个随机数,接着对打上随机数的数据执行reduceByKey等聚合操作,然后将各个key的前缀去掉。第二阶段全局聚合即正常的聚合操作。
5)针对两个数据量都比较大的RDD/hive表进行join的情况,如果其中一个RDD/hive表的少数key对应的数据量过大,另一个比较均匀时,可以先分析数据,将数据量过大的几个key统计并拆分出来形成一个单独的RDD,得到的两个RDD/hive表分别和另一个RDD/hive表做join,其中key对应数据量较大的那个要进行key值随机数打散处理,另一个无数据倾斜的RDD/hive表要1对n膨胀扩容n倍,确保随机化后key值仍然有效。
6)针对join操作的RDD中有大量的key导致数据倾斜,对有数据倾斜的整个RDD的key值做随机打散处理,对另一个正常的RDD进行1对n膨胀扩容,每条数据都依次打上0~n的前缀。处理完后再执行join操作
5) 其他错误总结
(1) 报错信息
java.lang.OutOfMemory, unable to create new native thread
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:640)
解决方案:
上面这段错误提示的本质是Linux操作系统无法创建更多进程,导致出错,并不是系统的内存不足。因此要解决这个问题需要修改Linux允许创建更多的进程,就需要修改Linux最大进程数
(2)报错信息
由于Spark在计算的时候会将中间结果存储到/tmp目录,而目前linux又都支持tmpfs,其实就是将/tmp目录挂载到内存当中, 那么这里就存在一个问题,中间结果过多导致/tmp目录写满而出现如下错误
No Space Left on the device(Shuffle临时文件过多)
解决方案:
修改配置文件spark-env.sh,把临时文件引入到一个自定义的目录中去, 即:
export SPARK_LOCAL_DIRS=/home/utoken/datadir/spark/tmp
(3)报错信息
Worker节点中的work目录占用许多磁盘空间, 这些是Driver上传到worker的文件, 会占用许多磁盘空间
解决方案:
需要定时做手工清理work目录
(4)spark-shell提交Spark Application如何解决依赖库
解决方案:
利用–driver-class-path选项来指定所依赖的jar文件,注意的是–driver-class-path后如果需要跟着多个jar文件的话,jar文件之间使用冒号:来分割。
(5)内存不足或数据倾斜导致Executor Lost,shuffle fetch失败,Task重试失败等(spark-submit提交)
TaskSetManager: Lost task 1.0 in stage 6.0 (TID 100, 192.168.10.37): java.lang.OutOfMemoryError: Java heap space
INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on 192.168.10.37:57139 (size: 42.0 KB, free: 24.2 MB)
INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on 192.168.10.38:53816 (size: 42.0 KB, free: 24.2 MB)
INFO TaskSetManager: Starting task 3.0 in stage 6.0 (TID 102, 192.168.10.37, ANY, 2152 bytes)
解决方案:
增加worker内存,或者相同资源下增加partition数目,这样每个task要处理的数据变少,占用内存变少
如果存在shuffle过程,设置shuffle read阶段的并行数
2. SparkSQL
2.1 Spark SQL 的原理和运行机制
从上图可见,无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作
- Parser 解析 SQL,生成 Unresolved Logical Plan
- 由 Analyzer 结合 Catalog 信息生成 Resolved Logical Plan
- Optimizer根据预先定义好的规则对 Resolved Logical Plan 进行优化并生成 Optimized Logical Plan
- Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan
- CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan
- Spark 以 DAG 的方法执行上述 Physical Plan
- 在执行 DAG 的过程中,Adaptive Execution 根据运行时信息动态调整执行计划从而提高执行效率
Parser
Spark SQL 使用 Antlr 进行记法和语法解析,并生成 UnresolvedPlan。
当用户使用 SparkSession.sql(sqlText : String) 提交 SQL 时,SparkSession 最终会调用 SparkSqlParser 的 parsePlan 方法。该方法分两步
- 使用 Antlr 生成的 SqlBaseLexer 对 SQL 进行词法分析,生成 CommonTokenStream
- 使用 Antlr 生成的 SqlBaseParser 进行语法分析,得到 LogicalPlan
Analyzer
从 Analyzer 的构造方法可见
Analyzer 持有一个 SessionCatalog 对象的引用
Analyzer 继承自 RuleExecutor[LogicalPlan],因此可对 LogicalPlan 进行转换
Optimizer
Spark SQL 目前的优化主要是基于规则的优化,即 RBO (Rule-based optimization)
- 每个优化以 Rule 的形式存在,每条 Rule 都是对 Analyzed Plan 的等价转换
- RBO 设计良好,易于扩展,新的规则可以非常方便地嵌入进 Optimizer
- RBO 目前已经足够好,但仍然需要更多规则来 cover 更多的场景
- 优化思路主要是减少参与计算的数据量以及计算本身的代价
PushdownPredicate
PushdownPredicate 是最常见的用于减少参与计算的数据量的方法。
SparkPlanner
得到优化后的 LogicalPlan 后,SparkPlanner 将其转化为 SparkPlan 即物理计划。
本例中由于 score 表数据量较小,Spark 使用了 BroadcastJoin。因此 score 表经过 Filter 后直接使用 BroadcastExchangeExec 将数据广播出去,然后结合广播数据对 people 表使用 BroadcastHashJoinExec 进行 Join。再经过 Project 后使用 HashAggregateExec 进行分组聚合。
至此,一条 SQL 从提交到解析、分析、优化以及执行的完整过程就介绍完毕。
2.3 Spark SQL 的优化策略
1)内存列式存储与内存缓存表
Spark SQL可以通过cacheTable将数据存储转换为列式存储,同时将数据加载到内存缓存。cacheTable相当于在分布式集群的内存物化视图,将数据缓存,这样迭代的或者交互式的查询不用再从HDFS读数据,直接从内存读取数据大大减少了I/O开销。列式存储的优势在于Spark SQL只需要读出用户需要的列,而不需要像行存储那样每次都将所有列读出,从而大大减少内存缓存数据量,更高效地利用内存数据缓存,同时减少网络传输和I/O开销。数据按照列式存储,由于是数据类型相同的数据连续存储,所以能够利用序列化和压缩减少内存空间的占用。
2)列存储压缩
为了减少内存和硬盘空间占用,Spark SQL采用了一些压缩策略对内存列存储数据进行压缩。Spark SQL的压缩方式要比Shark丰富很多,如它支持PassThrough、RunLengthEncoding、DictionaryEncoding、BooleanBitSet、IntDelta、LongDelta等多种压缩方式,这样能够大幅度减少内存空间占用、网络传输和I/O开销。
3)逻辑查询优化
SparkSQL在逻辑查询优化(见图8-4)上支持列剪枝、谓词下压、属性合并等逻辑查询优化方法。列剪枝为了减少读取不必要的属性列、减少数据传输和计算开销,在查询优化器进行转换的过程中会优化列剪枝。
下面介绍一个逻辑优化的例子。
SELECT Class FROM (SELECT ID,Name,Class FROM STUDENT ) S WHERE S.ID=1
Catalyst将原有查询通过谓词下压,将选择操作ID=1优先执行,这样过滤大部分数据,通过属性合并将最后的投影只做一次,最终保留Class属性列。
4)Join优化
Spark SQL深度借鉴传统数据库的查询优化技术的精髓,同时在分布式环境下调整和创新特定的优化策略。现在Spark SQL对Join进行了优化,支持多种连接算法,现在的连接算法已经比Shark丰富,而且很多原来Shark的元素也逐步迁移过来,如BroadcastHashJoin、BroadcastNestedLoopJoin、HashJoin、LeftSemiJoin,等等。
下面介绍其中的一个Join算法。
BroadcastHashJoin将小表转化为广播变量进行广播,这样避免Shuffle开销,最后在分区内做Hash连接。这里使用的就是Hive中Map Side Join的思想,同时使用DBMS中的Hash连接算法做连接。 随着Spark SQL的发展,未来会有更多的查询优化策略加入进来,同时后续Spark SQL会支持像Shark Server一样的服务端和JDBC接口,兼容更多的持久化层,如NoSQL、传统的DBMS等。一个强有力的结构化大数据查询引擎正在崛起。
3. SparkStreaming
3.1 原理剖析(源码级别)和运行机制
3.2 Spark Dstream 及其 API 操作
3.3 Spark Streaming 消费 Kafka 的两种方式
3.4 Spark 消费 Kafka 消息的 Offset 处理
3.5 窗口操作
4. SparkMlib
可实现聚类、分类、推荐等算法
三. Flink
- Flink 集群的搭建
- Flink 的架构原理
- Flink 的编程模型
- Flink 集群的 HA 配置
- Flink DataSet 和 DataSteam API
- 序列化
- Flink 累加器
- 状态 State 的管理和恢复
- 窗口和时间
- 并行度
- Flink 和消息中间件 Kafka 的结合
- Flink Table 和 SQL 的原理和用法
四. Kafka
1. Kafka 的设计
Kafka 将消息以 topic 为单位进行归纳
将向 Kafka topic 发布消息的程序成为 producers.
将预订 topics 并消费消息的程序成为 consumer.
Kafka 以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个 broker.
producers 通过网络将消息发送到 Kafka 集群,集群向消费者提供消息
2. 数据传输的三种事务定义
数据传输的事务定义通常有以下三种级别:
(1)最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输
(2)最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
(3)精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而
且仅仅被传输一次,这是大家所期望的
3. Kafka 判断一个节点是否活着两大条件
(1)节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连
接
(2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久
4. Kafa consumer 是否可以消费指定分区消息?
Kafa consumer 消费消息时,向 broker 发出”fetch”请求去消费特定分区的消息,consumer
指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer 拥有
了 offset 的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的
5. Kafka 消息是采用 Pull 模式or Push 模式?
Kafka 最初考虑的问题是,customer 应该从 brokes 拉取消息还是 brokers 将消息推送到
consumer,也就是 pull 还 push。在这方面,Kafka 遵循了一种大部分消息系统共同的传统
的设计:producer 将消息推送到 broker,consumer 从 broker 拉取消息
一些消息系统比如 Scribe 和 Apache Flume 采用了 push 模式,将消息推送到下游的
consumer。这样做有好处也有坏处:由 broker 决定消息推送的速率,对于不同消费速率的
consumer 就不太好处理了。消息系统都致力于让 consumer 以最大的速率最快速的消费消
息,但不幸的是,push 模式下,当 broker 推送的速率远大于 consumer 消费的速率时,
consumer 恐怕就要崩溃了。最终 Kafka 还是选取了传统的 pull 模式
Pull 模式的另外一个好处是 consumer 可以自主决定是否批量的从 broker 拉取数据。Push
模式必须在不知道下游 consumer 消费能力和消费策略的情况下决定是立即推送每条消息还
是缓存之后批量推送。如果为了避免 consumer 崩溃而采用较低的推送速率,将可能导致一
次只推送较少的消息而造成浪费。Pull 模式下,consumer 就可以根据自己的消费能力去决
定这些策略
Pull 有个缺点是,如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,
直到新消息到 t 达。为了避免这点,Kafka 有个参数可以让 consumer 阻塞知道新消息到达
(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发
6. Kafka 存储在硬盘上的消息格式是什么?
消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和 CRC32
校验码。
- 消息长度: 4 bytes (value: 1+4+n)
- 版本号: 1 byte
- CRC 校验码: 4 bytes
- 具体的消息: n bytes
7. Kafka 高效文件存储设计特点
(1).Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通过多个小文件段,就容易定
期清除或删除已经消费完文件,减少磁盘占用。
(2).通过索引信息可以快速定位 message 和确定 response 的最大大小。
(3).通过 index 元数据全部映射到 memory,可以避免 segment file 的 IO 磁盘操作。
(4).通过索引文件稀疏存储,可以大幅降低 index 文件元数据占用空间大小。
8. Kafka 与传统消息系统之间有三个关键区别
(1).Kafka 持久化日志,这些日志可以被重复读取和无限期保留
(2).Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据
提升容错能力和高可用性
(3).Kafka 支持实时的流式处理
9. Kafka 创建 Topic 时如何将分区放置到不同的 Broker 中
- 副本因子不能大于 Broker 的个数;
- 第一个分区(编号为 0)的第一个副本放置位置是随机从 brokerList 选择的;
- 其他分区的第一个副本放置位置相对于第 0 个分区依次往后移。也就是如果我们有 5 个Broker,5 个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个Broker 上,依次类推;
- 剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的
10. Kafka 新建的分区会在哪个目录下创建
在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,
这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘
上用于提高读写性能。
当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。
如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个
目录下创建文件夹用于存放数据。
但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?
答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic
名+分区 ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就
是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁
盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。
11. partition 的数据如何保存到硬盘
topic 中的多个 partition 以文件夹的形式保存到 broker,每个分区序号从 0 递增,
且消息有序
Partition 文件下有多个 segment(xxx.index,xxx.log)
segment 文件里的 大小和配置文件大小一致可以根据要求修改 默认为 1g
如果大小大于 1g 时,会滚动一个新的 segment 并且以上一个 segment 最后一条消息的偏移
量命名
12. kafka 的 ack 机制
request.required.acks 有三个值 0 1 -1
0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候
就会丢数据
1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader 挂掉后他
不确保是否复制完成新 leader 也会导致数据丢失
-1:同样在 1 的基础上 服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出
的 ack,这样数据不会丢失
13. Kafka 的消费者如何消费数据
消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置
等到下次消费时,他会接着上次位置继续消费
14. 消费者负载均衡策略
一个消费者组中的一个分片对应一个消费者成员,他能保证每个消费者成员都能访问,如
果组中成员太多会有空闲的成员
15. 数据有序
一个消费者组里它的内部是有序的
消费者组与消费者组之间是无序的
16. kafaka 生产数据时数据的分组策略
生产者决定数据产生到集群的哪个 partition 中
每一条消息都是以(key,value)格式
Key 是由生产者发送数据传入
所以生产者(key)决定了数据产生到集群的哪个 partition
五. 数据仓库
5.1 数仓概念相关
1. 数据仓库、数据集市、数据库之间的区别
数据仓库 :数据仓库是一个面向主题的、集成的、随时间变化的、但信息本身相对稳定的数据集合,用于对管理决策过程的支持。是企业级的,能为整个企业各个部门的运行提供决策支持手段;
数据集市:则是一种微型的数据仓库,它通常有更少的数据,更少的主题区域,以及更少的历史数据,因此是部门级的,一般只能为某个局部范围内的管理人员服务,因此也称之为部门级数据仓库。
数据库:是一种软件,用来实现数据库逻辑过程,属于物理层;
数据仓库是数据库概念的升级,从数据量来说,数据仓库要比数据库更庞大德多,主要用于数据挖掘和数据分析,辅助领导做决策
只是数据库内的数据时限要远远的长于操作型环境中的数据时限。在操作型环境中一般只保存有60
90天的数据,而在数据仓库中则要需要保存较长时限的数据(例如:510年),以适应DSS进行趋势分析的要求。
2. OLAP、OLTP概念及用途
OLAP: 即
On-Line Analysis Processing
在线分析处理。OLAP的特点:联机分析处理的主要特点,是直接仿照用户的多角度思考模式,预先为用户组建多维的数据模型,维指的是用户的分析角度。
OLTP: 即
On-Line Transaction Processing
联机事务处理过程(OLTP)OLTP的特点:结构复杂、实时性要求高。
OLAP和OLTP区别
1、基本含义不同:OLTP是传统的关系型数据库的bai主要应用,主要是基本的、日常的事务处理,记du录即时的增、删、改、查,比如在银行存取一笔款,就是一个事务交易。OLAP即联机分析处理,是数据仓库的核心部心,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。典型的应用就是复杂的动态报表系统。
2、实时性要求不同:OLTP实时性要求高,OLTP 数据库旨在使事务应用程序仅写入所需的数据,以便尽快处理单个事务。OLAP的实时性要求不是很高,很多应用顶多是每天更新一下数据。
3、数据量不同:OLTP数据量不是很大,一般只读/写数十条记录,处理简单的事务。OLAP数据量大,因为OLAP支持的是动态查询,所以用户也许要通过将很多数据的统计后才能得到想要知道的信息,例如时间序列分析等等,所以处理的数据量很大。
4、用户和系统的面向性不同:OLTP是面向顾客的,用于事务和查询处理。OLAP是面向市场的,用于数据分析。
5、数据库设计不同:OLTP采用实体-联系ER模型和面向应用的数据库设计。OLAP采用星型或雪花模型和面向主题的数据库设计。
3. 事实表、维度表、拉链表概念及区别
- 事实表:事实表其实质就是通过各种维度和一些指标值得组合来确定一个事实的,比如通过时间维度,地域组织维度,指标值可以去确定在某时某地的一些指标值怎么样的事实。事实表的每一条数据都是几条维度表的数据和指标值交汇而得到的。每行数据代表一个业务事件,(下单、支付、退款、评价等) 。“事实”这个术语表示的是业务事件的度量值(可统计次数、个数、金额等)
- 事务型事实表
- 周期快照型事实表
- 累积快照型事实表
维度表:维度表可以看成是用户用来分析一个事实的窗口,它里面的数据应该是对事实的各个方面描述,比如时间维度表,它里面的数据就是一些日,周,月,季,年,日期等数据,维度表只能是事实表的一个分析角度。
- 拉链表:拉链表,它是一种维护历史状态,以及最新状态数据的一种表。拉链表也是分区表,有些不变的数据或者是已经达到状态终点的数据就会把它放在分区里面,分区字段一般为开始时间:start_date和结束时间:end_date。一般在该天有效的数据,它的end_date是大于等于该天的日期的。获取某一天全量的数据,可以通过表中的start_date和end_date来做筛选,选出固定某一天的数据。例如我想取截止到20190813的全量数据,其where过滤条件就是where start_date<=’20190813’ and end_date>=20190813。
拉链表使用场景和实现方式?
【yy总结】
拉链表使用场景:需要查看历史某一时间节点的状态,同时考虑到存储空间。
实现方式:
首先是拉链表dw_order_his的设置,有start_date和end_date两个字段;
其次在ods层创建一个ods_order_update表,储存当变化数据(包括insert和update的数据)
源表(order) ods_order_update表和dw_order_his表的交集进行封链操作,end_date=current_date
ods_oder_update数据插入到his表中,对于记录的end_date=9999-12-31,start_date=current_date
【使用场景】
在数据仓库的数据模型设计过程中,经常会遇到下面这种表的设计:
有一些表的数据量很大,比如一张用户表,大约10亿条记录,50个字段,这种表,即使使用ORC压缩,单张表的存储也会超过100G,在HDFS使用双备份或者三备份的话就更大一些。
表中的部分字段会被update更新操作,如用户联系方式,产品的描述信息,订单的状态等等。
需要查看某一个时间点或者时间段的历史快照信息,比如,查看某一个订单在历史某一个时间点的状态。
表中的记录变化的比例和频率不是很大,比如,总共有10亿的用户,每天新增和发生变化的有200万左右,变化的比例占的很小。
【实现方式】
全量主要数据表加载的策略为每次加载时需要根据主键将目标表的数据与源表数据进行比对,删除目标表中在源数据表中的相关记录,然后将源表数据全部插入目标表。表现在脚本上为先delete相关记录,然后insert所有记录。主表加载策略主要用于大部分主表的加载,比如客户信息等主要数据表。
增量拉链是指每次加载时,将源表数据视为增量抽取后的结果,加载到目标表时需要考虑数据历史情况。一般数据发生变化时关闭旧数据链,然后开新数据链。增量拉链针对的是历史表情况,由于数据仓库中记录了大部分数据历史表变化情况,因此增量拉链加载策略在数据仓库中是使用比较广泛的一种加载策略。通常这种历史表都含有start_date和end_date字段,首先全字段对比源数据和目标表得出真正的增量数据,这里的全字段不包含start_date和end_date字段,然后根据主键对目标表进行关旧链操作,然后对新增数据开新链,这种拉链策略同样可以处理全量数据。
【拉链表性能优化】
拉链表当然也会遇到查询性能的问题,比如说我们存放了5年的拉链数据,那么这张表势必会比较大,当查询的时候性能就比较低了,个人认为两个思路来解决:
- 在一些查询引擎中,我们对start_date和end_date做索引,这样能提高不少性能。
- 保留部分历史数据,比如说我们一张表里面存放全量的拉链表数据,然后再对外暴露一张只提供近3个月数据的拉链表。
4. 全量表、增量表、快照表概念及区别
- 全量表:全量表没有分区,表中的数据是前一天的所有数据,比如说今天是24号,那么全量表里面拥有的数据是23号的所有数据,每次往全量表里面写数据都会覆盖之前的数据,所以全量表不能记录历史的数据情况,只有截止到当前最新的、全量的数据。
- 增量表:增量表,就是记录每天新增数据的表,比如说,从24号到25号新增了那些数据,改变了哪些数据,这些都会存储在增量表的25号分区里面。上面说的快照表的25号分区和24号分区(都是t+1,实际时间分别对应26号和25号),它两的数据相减就是实际时间25号到26号有变化的、增加的数据,也就相当于增量表里面25号分区的数据。
- 快照表:那么要能查到历史数据情况又该怎么办呢?这个时候快照表就派上用途了,快照表是有时间分区的,每个分区里面的数据都是分区时间对应的前一天的所有全量数据,比如说当前数据表有3个分区,24号,25号,26号。其中,24号分区里面的数据就是从历史到23号的所有数据,25号分区里面的数据就是从历史到24号的所有数据,以此类推。
5. 什么叫维度和度量值
维度:说明数据,维度是指可指定不同值的对象的描述性属性或特征。例如,地理位置的维度可以包括“纬度”、“经度”或“城市名称”。“城市名称”维度的值可以为“旧金山”、“柏林”或“新加坡”。
度量:事实表和维度交叉汇聚的点,度量和维度构成OLAP的主要概念,这里面对于在事实表或者一个多维立方体里面存放的数值型的、连续的字段,就是度量。这符合上面的意思,有标准,一个度量字段肯定是统一单位,例如元、户数。如果一个度量字段,其中的度量值可能是欧元又有可能是美元,那这个度量可没法汇总。在统一计量单位下,对不同维度的描述。
6. 什么叫缓慢变化维(Slowly Changing Dimensions,SCD)
维度建模的数据仓库中,有一个概念叫Slowly Changing Dimensions,中文一般翻译成缓慢变化维,经常被简写为SCD。缓慢变化维的提出是因为在现实世界中,维度的属性并不是静态的,它会随着时间的流失发生缓慢的变化。这种随时间发生变化的维度我们一般称之为缓慢变化维,并且把处理维度表的历史变化信息的问题称为处理缓慢变化维的问题,有时也简称为处理SCD的问题。
处理缓慢变化维的方法通常分为三种方式:
第一种方式是直接覆盖原值。这样处理,最容易实现,但是没有保留历史数据,无法分析历史变化信息。第一种方式通常简称为“TYPE 1”。
第二种方式是添加维度行。这样处理,需要代理键的支持。实现方式是当有维度属性发生变化时,生成一条新的维度记录,主键是新分配的代理键,通过自然键可以和原维度记录保持关联。第二种方式通常简称为“TYPE 2”。例如将当前行的状态设置为off,并设置一个endtime时间戳,将当前时间标记上。同时新增1行,将其状态标记为on,设置begintime时间戳为上一个记录的endtime+1。
第三种方式是添加属性列。这种处理的实现方式是对于需要分析历史信息的属性添加一列,来记录该属性变化前的值,而本属性字段使用TYPE 1来直接覆盖。这种方式的优点是可以同时分析当前及前一次变化的属性值,缺点是只保留了最后一次变化信息。第三种方式通常简称为“TYPE 3”。
6.3 维度表
Ø 维度表可以看作是用户来分析数据的窗口,
Ø 维度表中包含事实数据表中事实记录的特性,有些特性提供描述性信息,有些特性指定如何汇总事实数据表数据,以便为分析者提供有用的信息,
Ø 维度表包含帮助汇总数据的特性的层次结构。6.4 维度分类
维度的类型:
- 缓慢变化维(Slowly Changing Dimension)
- 快速变化维(Rapidly Changing Dimension)
- 大维(Huge Dimension)和迷你维(Mini-Dimension)
- 退化维(Degenerate Dimension)
**缓慢变化维(SCD):
大多数的维度的内容都会有不同程度的改变。比如:
雇员的升职
客户更改了他的名称或地址
我们如何去处理这些维度中的变化呢?
下面提供了三个处理缓慢变化维的方式
直接更新到原先记录中
标记记录有效时间的开始日期和结束日期,加入版本控制
在记录中添加一个字段来记录历史*快速变化维(FCD):
当某个维度的变化是非常快的时候,我们认定他为快速变化维(具体要看实际的变化频率),比如:
产品的价格,地产的价格等
例如在一个分析用户如何使用搜索引擎的DW项目中,将用户搜索的关键字作为一个维度。由于用户使用的关键字会快速变化,因此关键字维度中的数据量会迅速增加。
另外一个例子就是精度为秒的时间维度,每秒就会增加一个维度值通常RCD的处理可以分为3类:
Rapidly Changing Small Dimensions – 即维度表字段并不多,表的数据量也不大的情况。这种情形应用SCD中的Type2就可以了。(即:新增一行,旧行置过期)
Rapidly Changing Large Dimensions – 即维度表字段较多,表的数据量较大的情况。这种情形Type2会增加过多的行并导致效率降低,因此通常采用Type3.(新增列:仅保存上一个值previous_value,current_value)
Rapidly Changing Monster Dimensions – 最糟糕的情况,即维度表字段较多,表的数据量很大,且维度表中的一部分字段频繁变化的情况。此时应将相对稳定的字段和频繁变化的字段分割开,频繁变化的字段独立出来形成新的维度表与事实表相连或形成新的雪花关系。(维表分离)大维度(HugeDimension):
数据仓库中最有意思的维度是一些非常大的维度,比如客户,产品等等。一个大的企业客户维度往往有上百万记录,每条记录又有上百个字段。而大的个人客户维度则会超过千万条记录,这些个人客户维度有时也会有十多个字段,但大多数时候比较少见的维度也只有不多的几个属性。
大维度需要特殊的处理。由于数据量大,很多涉及大维度数据仓库功能可能会很慢,效率很低。你需要采用高效率的设计方法、选择正确的索引、或者采用其它优化技术来处理以下问题,包括:
向大维度表填充数据
非限制维度的浏览性能,尤其是那些属性较少的维度
多限制的维度属性值的浏览时间
涉及大维度表的对事实表查询的低效率问题
为处理第二类修改所需要增加的额外的记录迷你维(MiniDimension):
将常用的大维度中的少数字段提取出来,形成一个字段少的维度,在查询的时候便可以使用迷你维中的字段
这样的设计明显提高查询效率普通维:
普通维是基于一个维表的维度,由维表中的不同列来表示维度中的不同级别。
雪花维:
雪花维是基于多个维表的维度,各个维表间以外键关联,分别存储在同一维度中不同级别的成员列值。
父子维:
父子维是基于两个维表列的维度,由维表中的两列来共同定义各个成员的隶属关系,一列称为成员键列,标识每个成员,另一列称为父键列,标识每个成员的父代。
父子维度通俗的话来讲,这个表是自反 的,即外键本身就是引用的主键;类似这样的关系,如公司组织结构,分公司是总公司的一部分,部门是分公司的一部分,当然如果定义得好的话员工是部门的一部 分;通常公司的组织架构并非处在等层次上的,例如总公司下面的部门看起来就和分公司是一样的层次。因此父子维的层次通常不固定的。
因为父子维的复杂的自引用关系,如果按照缓慢维度的全历史记录方式来处理,必然导致逻辑关系混乱,处理起来比较棘手;任何一个组织的变动 (修改名称,更改引用,新增等等操作 )将会引起其下属节点相应的变动;任何一个意外都会导致整个结构的变化,同时发生意外后所带来的逻辑关系很难理顺。而 SQLServer2000中 Analysis Service对于这种急剧的变化处理并不稳定。
因此建议按照缓慢变化维的覆盖方式解决,即只根据主键这个唯一标志进行判断是否是新增还是修改。
索引:
与在其他关系数据库中一样,索引对数据仓库的性能具有重要作用。每个维度表都必须在主键上建立索引。在其他列如标识层次结构级别的列上,索引对于某些专用查询的性能也很游泳。事实数据表必须在由维度表外键构成的组件主键上建立索引。
粒度(Grain) 层次(Hierarchy):
Ø 粒度是指数据仓库的数据单位中保存数据的细化或综合程度的级别。细化程度越高,粒度级就越小;相反,细化程度越低,粒度级就越大。设计粒度是设计数据仓库中的一个重要的前提
Ø 层次指描述明细数据的层次
一些影响维度建模的因素:
Ø 数据或展现的安全性
Ø 复杂的查询和分析
5.2 数仓分层设计
1. 数据仓库分为5层:
- ODS层 (原始数据层) BDM
- DWD层 (明细数据层) FDM
- DWS层 (服务数据层) GDM
- DWT层(数据主题层) ADM
- ADS层 (数据应用层) APP
2. 各层主要负责职责
- ODS层(原始数据层):存放原始数据,直接加载原始日志、数据,数据保存原貌不做处理。
- DWD层(明细数据层):结构与粒度原始表保持一致,对ODS层数据进行清洗(去除空值、脏数据、超过极限范围的数据)
- DWS层 (服务数据层):以DWD为基础,进行轻度汇总
- DWT层(数据主题层):以DWS为基础,进行累积汇总
- ADS层 (数据应用层):为各种统计报表提供数据
3. **为什么要分层?**
减少重复开发:规范数据分层, 通过的中间层数据, 能够减少极大的重复计算, 增加一次计算结果的复用性。
把复杂问题简单化:一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解。而且便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的步骤开始修复
隔离原始数据:不论是数据的异常还是数据的敏感性, 使真实数据与统计数据解耦开
4. 数仓中每层表的建模?怎么建模?
(1)ODS: 特点是保持原始数据的原貌,不作修改!
原始数据怎么建模,ODS就怎么建模!举例: 用户行为数据特征是一条记录就是一行!
ODS层表(line string) 业务数据,参考Sqoop导入的数据类型进行建模!
(2)DWD层:特点从ODS层,将数据进行ETL(清洗),轻度聚合,再展开明细!
- 在展开明细时,对部分维度表进行降维操作
例如:将商品一二三级分类表,sku商品表,spu商品表,商品品牌表合并汇总为一张维度表!
- 对事实表,参考星型模型的建模策略,按照选择业务过程→声明粒度→确认维度→确认事实思路进行建模
选择业务过程: 选择感兴趣的事实表
声明粒度: 选择最细的粒度!可以由最细的粒度通过聚合的方式得到粗粒度!
确认维度: 根据3w原则确认维度,挑选自己感兴趣的维度
确认事实: 挑选感兴趣的度量字段,一般是从事实表中选取!
- DWS层: 根据业务需求进行分主题建模!一般是建宽表!
- DWT层: 根据业务需求进行分主题建模!一般是建宽表!
- ADS层: 根据业务需求进行建模!
5.3 数仓建模
1. 维度建模概念、类型、过程
维度建模:维度建模是一种将数据结构化的逻辑设计方法,它将客观世界划分为度量和上下文。度量是常常是以数值形式出现,事实周围有上下文包围着,这种上下文被直观地分成独立的逻辑块,称之为维度。它与实体-关系建模有很大的区别,实体-关系建模是面向应用,遵循第三范式,以消除数据冗余为目标的设计技术。维度建模是面向分析,为了提高查询性能可以增加数据冗余,反规范化的设计技术。
维度建模过程:确定业务流程->确定粒度->确定纬度->确定事实
建模四步走:
1.选取要建模的业务处理流程
关注业务处理流程,而不是业务部门!
2.定义业务处理的粒度
“如何描述事实表的单个行?”
3.选定用于每个事实表行的维度
常见维度包括日期、产品等
4.确定用于形成每个事实表行的数字型事实
典型的事实包括订货量、支出额这样的可加性数据
2. 星型模型和雪花模型概念、区别
在维度建模的基础上又分为三种模型: 星型模型、 雪花模型、 星座模型。
星型模型:雪花模型与星型模型的区别主要在于维度的层级,标准的星型模型维度只有一层,而雪花模型可能会涉及多级。
雪花模型: 比较靠近3NF, 但是无法完全遵守, 因为遵循3NF的性能成本太高。
星座模型:星座模型与前两种情况的区别是事实表的数量, 星座模型是基于多个事实表。基本上是很多数据仓库的常态, 因为很多数据仓库都是多个事实表的。 所以星座不星座只反映是否有多个事实表, 他们之间是否共享一些维度表。所以星座模型并不和前两个模型冲突。
如何选择
模型的选择首先就是星座不星座这个只跟数据和需求有关系, 跟设计没关系, 不用选择。星型还是雪花, 取决于性能优先, 还是灵活更优先。
目前实际企业开发中, 不会绝对选择一种, 根据情况灵活组合, 甚至并存(一层维度和多层维度都保存) 。 但是整体来看, 更倾向于维度更少的星型模型。 尤其是Hadoop体系, 减少Join就是减少Shuffle, 性能差距很大。(关系型数据可以依靠强大的主键索引)
总结
通过上面的对比,我们可以发现数据仓库大多数时候是比较适合使用星型模型构建底层数据Hive表,通过大量的冗余来提升查询效率,星型模型对OLAP的分析引擎支持比较友好,这一点在Kylin中比较能体现。而雪花模型在关系型数据库中如MySQL,Oracle中非常常见,尤其像电商的数据库表。在数据仓库中雪花模型的应用场景比较少,但也不是没有,所以在具体设计的时候,可以考虑是不是能结合两者的优点参与设计,以此达到设计的最优化目的。
5. 4 数据治理
1)数据压缩
2)小文件合并
3)冷数据处理
5.5 总线架构
总线架构
事实一致性
维度一致性
5.6 数仓总结
![image-20210220171930469](/Users/liyu/Library/Application Support/typora-user-images/image-20210220171930469.png)