1. 初识 Flink
在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题。目前比较流行的大数据处理引擎 Apache Spark
,基本上已经取代了 MapReduce 成为当前大数据处理的标准。但 对实时数据处理来说,Apache Spark 的 Spark-Streaming 还有性能改进的空间。对于 Spark-Streaming 的流计算本质上还是批(微批)计算
,Apache Flink 就是近年来在开源社区不断发展的技术中的能够同时支持高吞吐
、低延迟
、高性能
的纯实时的分布式处理框架(主要贡献者是阿里(官网支持汉化阅读),QPS可达30W+)。
Flink 是什么
1. Flink 的发展历史
在 2010 年至 2014 年间,由柏林工业大学、柏林洪堡大学和哈索普拉特纳研究所联合发 起名为Stratosphere:Information Management on the Cloud
研究项目,该项目在当时的社区逐渐具有了一定的社区知名度。2014 年 4 月,Stratosphere 代码被贡献给 Apache 软件基金会,成为 Apache 基金会孵化器项目。初期参与该项目的核心成员均是 Stratosphere 曾经的核心成员,之后团队的大部分创始成员离开学校,共同创办了一家名叫 Data Artisans 的公司,其主要业务便是将 Stratosphere,也就是之后的 Flink 实现商业化。在项目孵化 期间,项目 Stratosphere 改名为 Flink。Flink 在德语中是快速和灵敏的意思,用来体现流 式数据处理器速度快和灵活性强等特点,同时使用棕红色松鼠图案作为 Flink 项目的 Logo, 也是为了突出松鼠灵活快速的特点,由此,Flink 正式进入社区开发者的视线。 2014 年 12 月,该项目成为 Apache 软件基金会顶级项目,从 2015 年 9 月
发布第一个稳 定版本 0.9,到目前为止已经发布到 1.9 的版本,更多的社区开发成员逐步加入,现在 Flink 在全球范围内拥有 350 多位开发人员,不断有新的特性发布。同时在全球范围内,越来越多 的公司开始使用 Flink,在国内比较出名的互联网公司如阿里巴巴、美团、滴滴等,都在大 规模使用 Flink 作为企业的分布式大数据处理引擎。
2. Flink 的定义
Apache Flink 是一个框架和分布式处理引擎,用于在无边界
和有边界
数据流上进行有状态
的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale
3. 有界流和无界流
任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或 移动应用程序上的用户交互记录,所有这些数据都形成一种流。
无界流: 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流 的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理, 因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事 件,例如事件发生的顺序,以便能够推断结果的完整性。
有界流: 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行 计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。跟Spark-Stream类似。
Apache Flink 擅长处理无界和有界数据集精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。
4. 有状态的计算架构
数据产生的本质,其实是一条条真实存在的事件按照时间顺序源源不断的产生,我们很难在数据产生的过程中进行计算并直接产生统计结果,因为这不仅对系统有非常高的要求, 还必须要满足高性能、高吞吐、低延时等众多目标。而有状态流计算架构(如图所示)的提 出,从一定程度上满足了企业的这种需求,企业基于实时的流式数据,维护所有计算过程的 状态,所谓状态就是计算过程中产生的中间计算结果
,每次计算新的数据进入到流式系统中 都是基于中间状态结果的基础上进行运算
,最终产生正确的统计结果。基于有状态计算的方式最大的优势是不需要将原始数据重新从外部存储中拿出来
,从而进行全量计算,因为这种计算方式的代价可能是非常高的。从另一个角度讲,用户无须通过调度和协调各种批量计算 工具,从数据仓库中获取数据统计结果,然后再落地存储,这些操作全部都可以基于流式计 算完成,可以极大地减轻系统对其他框架的依赖,减少数据计算过程中的时间损耗以及硬件存储。
2. 为什么要使用 Flink
可以看出有状态流计算将会逐步成为企业作为构建数据平台的架构模式
,而目前从社区 来看,能够满足的只有 Apache Flink。Flink 通过实现 Google Dataflow
流式计算模型实现 了高吞吐、低延迟、高性能兼具实时流式计算框架。同时 Flink 支持高度容错的状态管理, 防止状态在计算过程中因为系统异常而出现丢失,Flink 周期性地通过分布式快照技术 Checkpoints
实现状态的持久化维护,使得即使在系统停机或者异常的情况下都能计算出正 确的结果。
Flink用户 众多,自2019年1月起,阿里巴巴逐步将内部维护的Blink回馈给Flink开源社区,目前贡献代码已超过100万行,国内包括腾讯、百度、字节跳动等公司,国外包括Uber、Lyft、Netflix等公司都是Flink的使用者。
3. Flink 的应用场景
在实际生产的过程中,大量数据在不断地产生,例如金融交易数据、互联网订单数据、 GPS 定位数据、传感器信号、移动终端产生的数据、通信信号数据等,以及我们熟悉的网络 流量监控、服务器产生的日志数据,这些数据最大的共同点就是实时从不同的数据源中产生, 然后再传输到下游的分析系统。针对这些数据类型主要包括实时智能推荐、复杂事件处理、 实时欺诈检测、实时数仓与 ETL 类型、流数据分析类型、实时报表类型等实时业务场景,而 Flink 对于这些类型的场景都有着非常好的支持
1. 实时智能推荐
智能推荐会根据用户历史的购买行为,通过推荐算法训练模型,预测用户未来可能会购 买的物品。对个人来说,推荐系统起着信息过滤的作用,对 Web/App 服务端来说,推荐系统 起着满足用户个性化需求,提升用户满意度的作用。推荐系统本身也在飞速发展,除了算法 越来越完善,对时延的要求也越来越苛刻和实时化。利用 Flink 流计算帮助用户构建更加实 时的智能推荐系统,对用户行为指标进行实时计算,对模型进行实时更新,对用户指标进行 实时预测,并将预测的信息推送给 Wep/App 端,帮助用户获取想要的商品信息,另一方面也 帮助企业提升销售额,创造更大的商业价值。
2. 复杂事件处理
对于复杂事件处理,比较常见的案例主要集中于工业领域,例如对车载传感器、机械设备等实时故障检测,这些业务类型通常数据量都非常大,且对数据处理的时效性要求非常高。 通过利用 Flink 提供的 CEP
(复杂事件处理)进行事件模式的抽取,同时应用 Flink 的 Sql 进行事件数据的转换,在流式系统中构建实时规则引擎,一旦事件触发报警规则,便立即将 告警结果传输至下游通知系统,从而实现对设备故障快速预警监测,车辆状态监控等目的。
3. 实时欺诈检测
在金融领域的业务中,常常出现各种类型的欺诈行为,例如信用卡欺诈、信贷申请欺诈等,而如何保证用户和公司的资金安全,是近年来许多金融公司及银行共同面对的挑战。 随着不法分子欺诈手段的不断升级,传统的反欺诈手段已经不足以解决目前所面临的问题。 以往可能需要几个小时才能通过交易数据计算出用户的行为指标
,然后通过规则判别出具有 欺诈行为嫌疑的用户,再进行案件调查处理,在这种情况下资金可能早已被不法分子转移, 从而给企业和用户造成大量的经济损失。而运用 Flink 流式计算技术能够在毫秒内就完成对 欺诈判断行为指标的计算,然后实时对交易流水进行规则判断或者模型预测,这样一旦检测 出交易中存在欺诈嫌疑,则直接对交易进行实时拦截,避免因为处理不及时而导致的经济损 失。
4. 实时数仓与 ETL 结合离线数仓
通过利用流计算诸多优势和 SQL 灵活的加工能力,对流式数据进行实时清洗
、归并
、结构化处理
,为离线数仓进行补充和优化。另一方面结合实时数据 ETL 处理能 力,利用有状态流式计算技术,可以尽可能降低企业由于在离线数据计算过程中调度逻辑的复杂度,高效快速地处理企业需要的统计结果,帮助企业更好地应用实时数据所分析出来的结果。
5. 流数据分析
实时计算各类数据指标
,并利用实时结果
及时调整在线系统相关策略,在各类内容投放、 无线智能推送领域有大量的应用。流式计算技术将数据分析场景实时化,帮助企业做到实时化分析 Web 应用或者 App 应用的各项指标,包括 App 版本分布情况、Crash 检测和分布等, 同时提供多维度用户行为分析,支持日志自主分析,助力开发者实现基于大数据技术的精细 化运营、提升产品质量和体验、增强用户黏性。
6. 实时报表分析
实时报表分析
是近年来很多公司采用的报表统计方案之一,其中最主要的应用便是实时大屏展示。利用流式计算实时得出的结果直接被推送到前端应用,实时显示出重要指标的变 换情况。最典型的案例便是淘宝的双十一活动
,每年双十一购物节,除疯狂购物外,最引人 注目的就是天猫双十一大屏不停跳跃的成交总额。在整个计算链路中包括从天猫交易下单购买到数据采集
、数据计算
、数据校验
,最终落到双十一大屏上展现的全链路时间压缩在 5 秒以内,顶峰计算性能高达数三十万笔订单/秒
,通过多条链路流计算备份确保万无一失
。 而在其他行业,企业也在构建自己的实时报表系统,让企业能够依托于自身的业务数据,快 速提取出更多的数据价值,从而更好地服务于企业运行过程中。
4. Flink 的特点和优势
Flink 的具体优势和特点有以下几点
1. 同时支持高吞吐、低延迟、高性能
Flink 是目前开源社区中唯 一 一套集高吞吐
、低延迟
、高性能
三者于一身的分布式流式数据处理框架
。像 Apache Spark 也只能兼顾高吞吐和高性能特性,主要因为在 Spark Streaming 流式计算中无法做到低延迟保障;而流式计算框架 Apache Storm
只能支持低延迟和高性能特性,但是无法满足高吞吐的要求。而满足高吞吐、低延迟、高 性能这三个目标对分布式流式计算框架来说是非常重要的。
2. 支持事件时间(Event Time)概念
在流式计算领域中,窗口计算的地位举足轻重,但目前大多数框架窗口计算采用的都是系统时间
(Process Time),也是事件传输到计算框架处理时,系统主机的当前时间。Flink 能够支持基于事件时间
(Event Time)语义进行窗口计算,也就是使用事件产生的时间,这种基于事件驱动的机制使得事件即使乱序到达,流系统也能够计算出 确的结果,保持了事件原本产生时的时序性,尽可能避免网络传输或硬件系统的影响。
Event Time/Processing Time/Ingestion Time,也就是事件时间、处理时间、提取时间,那么这三个时间有什么区别和联系
下图是一个信号站,分别列出了事件时间、处理时间、提取时间的先后顺序。当然上面图示需要你对Flink有一个基本的了解。我们先白话解释,然后在官方解释。
- Event Time:也就是事件发生的时间,事件的发生时间。我们有些同学可能会模糊,这里举个例子,我们
产生日志的时间
,这个应该清楚的,日志的时间戳就是发生时间
。 - Processing Time:也就是处理时间,我们看到了这个已经进入Flink程序,也就是我们读取数据源时间,也就是日志到达Flink的时间,但是这个时间是
本地机器的时间
。 - Ingestion Time:也就是提取时间,我们看到它比处理时间还晚一些,这个时候数据已经发送给窗口,也就是
发送给窗口
的时间,也就是程序处理计算的时间。
3. 支持有状态计算
Flink 在 1.4 版本中实现了状态管理
,所谓状态就是在流式计算过程中将算子的中间结果数据保存在内存或者文件系统中,等下一个事件进入算子后可以从之前的状态中 获取中间结果中计算当前的结果,从而无须每次都基于全部的原始数据来统计结果,这 种方式极大地提升了系统的性能
,并降低了数据计算过程的资源消耗。对于数据量大且运算逻辑非常复杂的流式计算场景,有状态计算发挥了非常重要的作用。
4.支持高度灵活的窗口(Window)操作
在流处理应用中,数据是连续不断的,需要通过窗口
的方式对流数据进行一定范围 的聚合计算,例如统计在过去的 1 分钟内有多少用户点击某一网页,在这种情况下,我 们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行再计 算。Flink 将窗口划分为基于 Time、Count、Session,以及 Data-driven 等类型的窗口 操作,窗口
可以用灵活的触发条件定制化来达到对复杂的流传输模式的支持,用户可以 定义不同的窗口触发机制来满足不同的需求。
5.基于轻量级分布式快照(CheckPoint)实现的容错
Flink 能够分布式运行在上千个节点
上,将一个大型计算任务的流程拆解成小的计算过程
,然后将 tesk 分布到并行节点上进行处理。在任务执行过程中,能够自动发现事件处理过程中的错误而导致数据不一致的问题,比如:节点宕机、网路传输问题,或 是由于用户因为升级或修复问题而导致计算服务重启等。在这些情况下,通过基于分布 式快照技术的Checkpoints
,将执行过程中的状态信息进行持久化存储
,一旦任务出现异常停止,Flink 就能够从 Checkpoints 中进行任务的自动恢复
,以确保数据在处理过 程中的精准一致性(Exactly-Once
)。快照是默认自动开启实现的。
6.基于 JVM 实现独立的内存管理
内存管理是所有计算框架需要重点考虑的部分,尤其对于计算量比较大的计算场 景,数据在内存中该如何进行管理显得至关重要。针对内存管理,Flink 实现了自身管理内存的机制
,尽可能减少 JVM GC 对系统的影响。另外,Flink 通过序列化/反序列化 方法将所有的数据对象转换成二进制在内存中存储,降低数据存储的大小
的同时,能够更加有效地对内存空间进行利用
,降低 GC 带来的性能下降或任务异常的风险,因此 Flink 较其他分布式处理的框架会显得更加稳定,不会因为 JVM GC 等问题而影响整个 应用的运行。
7. Save Points(保存点)
对于 7*24 小时运行的流式应用,数据源源不断地接入,在一段时间内应用的终止有可能导致数据的丢失或者计算结果的不准确,例如进行集群版本的升级、停机运维操 作等操作。值得一提的是,Flink 通过 SavePoints
技术将任务执行的快照保存在存储介质上,当任务重启的时候可以直接从事先保存的 Save Points 恢复原有的计算状态, 使得任务继续按照停机之前的状态运行,Save Points 技术可以让用户更好地管理和运 维实时流式应用。不过需要手动启动跟恢复数据。
5. 常见实时计算框架对比
产品 | 模型 | API | 保证次数 | 容错机制 | 状态管理 | 延时 | 吞吐量 |
---|---|---|---|---|---|---|---|
Storm | Native(数据实时进入处理) | 组合式(基础API) | At-least-once(至少一次) | Record ACK(ACK机制) | 无 | 低 | 低 |
Trident | Micro-Batching(划分为小批次处理) | 组合式 | Exactly-once(精准一致性) | Record ACK | 基于操作(每个操作都有一个状态) | 中等 | 中等 |
Spark-Streaming | Micro-Batching | 声明式(提供封装后的高阶函数,比如Count) | Exactly-once | RDD CheckPoint(基于RDD做CheckPoint) | 基于DStream | 中等 | 高 |
Flink | Native | 声明式 | Exactly-once | CheckPoint(Flink的一种快照) | 基于操作 | 低 | 高 |
模型
:
Storm 和 Flink 是真正的一条一条处理数据;而 Trident(Storm 的封装框架) 和 Spark Streaming 其实都是小批处理,一次处理一批数据(小批量)。API
:
Storm 和 Trident 都使用基础 API 进行开发,比如实现一个简单的 sum 求和操作; 而 Spark Streaming 和 Flink 中都提供封装后的高阶函数,可以直接拿来使用,这样就 比较方便了。保证次数
:
在数据处理方面,Storm 可以实现至少处理一次,但不能保证仅处理一次, 这样就会导致数据重复处理问题,所以针对计数类的需求,可能会产生一些误差; Trident 通过事务可以保证对数据实现仅一次的处理,Spark Streaming 和 Flink 也是 如此。容错机制
:
Storm和Trident可以通过ACK
机制实现数据的容错机制,而Spark Streaming 和 Flink 可以通过CheckPoint
机制实现容错机制。状态管理
:
Storm 中没有实现状态管理,Spark Streaming 实现了基于 DStream 的状态 管理,而 Trident 和 Flink 实现了基于操作的状态管理。延时
:
表示数据处理的延时情况,因此 Storm 和 Flink 接收到一条数据就处理一条数据, 其数据处理的延时性是很低的;而 Trident 和 Spark Streaming 都是小型批处理,它们 数据处理的延时性相对会偏高。吞吐量
:
Storm 的吞吐量其实也不低,只是相对于其他几个框架而言较低;Trident 属于中等;而 Spark Streaming 和 Flink 的吞吐量是比较高的。
2.Flink编程入门
1. Flink 的开发环境
Flink 课程选择的是 Apache Flink 1.9.1 版本,是目前较的稳定版本,并且 兼容性比较好。
下载地址: https://flink.apache.org/zh/downloads.html
1. 开发工具
先说明一下开发工具的问题。官方建议使用IntelliJ IDEA
,因为它默认集成了 Scala
和Maven
环境,使用更加方便,当然使用 Eclipse 也是可以的。本文使用 IDEA。开发Flink 程序时,可以使用Java
、Python
或者Scala
语言,本教程使用 Scala,因为 使用 Scala 实现函数式编程会比较简洁。
2. 配置依赖
开发 Flink 应用程序需要最低限度的 API 依赖。最低的依赖库包括:flink-scala
和 flink-streaming-scala
。大多数应用需要依赖特定的连接器或其他类库,例如 Kafka 的连 接器、TableAPI、CEP 库等。这些不是 Flink 核心依赖的一部分,因此必须作为依赖项手 动添加到应用程序中。
与其他运行用户自定义应用的大多数系统一样,Flink 中有两大类依赖类库
- Flink 核心依赖:
Flink 本身包含运行所需的一组类和依赖,比如协调、网络通讯、checkpoint、容错处理、API、算子(如窗口操作)、 资源管理等,这些类和依赖形成了 Flink 运行时的核心。当 Flink 应用启动时,这些依赖必须可用。
这些核心类和依赖被打包在 flink-dist jar 里。它们是 Flink lib 文件夹下的一部分,也是 Flink 基本容器镜像的一部分。 这些依赖类似 Java String 和 List 的核心类库(rt.jar, charsets.jar等)。
Flink 核心依赖不包含连接器和类库(如 CEP、SQL、ML 等),这样做的目的是默认情况下避免在类路径中具有过多的依赖项和类。 实际上,我们希望尽可能保持核心依赖足够精简,以保证一个较小的默认类路径,并且避免依赖冲突。 - 用户应用依赖:
是指特定的应用程序需要的类库,如连接器,formats等。用户应用代码和所需的连接器以及其他类库依赖通常被打包到 application jar 中。用户应用程序依赖项不需包括 Flink DataSet / DataStream API 以及运行时依赖项,因为它们已经是 Flink 核心依赖项的一部分。
Flink官方依赖文档说明:官方依赖入手
2.WordCount演示
添加pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sowhat</groupId>
<artifactId>Flink-Test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<!-- 上述两个是核心依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Java Compiler https://blog.csdn.net/liupeifeng3514/article/details/80236077 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source> <!-- 开始代码时指定的JDK版本-->
<target>1.8</target> <!-- 编译成.class 文件所需版本-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
<!-- 表示会出现一个无任何依赖的jar,还有一个包含所有依赖的jar -- >
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
注意事项: 所有这些 依赖项 的作用域都应该设置为 provided
。 这意味着需要这些依赖进行编译,但不应
将它们打包到项目生成的应用程序jar文件中,因为这些依赖项是 Flink 的核心依赖,服务器运行环境在应用启动前已经是可用的状态了。
我们强烈建议保持这些依赖的作用域为 provided
。 如果它们的作用域未设置为 provided ,则典型的情况是因为包含了 Flink 的核心依赖而导致生成的jar包变得过大。 最糟糕的情况是添加到应用程序的 Flink 核心依赖项与你自己的一些依赖项版本冲突(通常通过反向类加载来避免)。
IntelliJ 上的一些注意事项
: 为了可以让 Flink 应用在 IntelliJ IDEA 中运行,这些 Flink 核心依赖的作用域需要设置为 compile
而不是provided
。 否则 IntelliJ 不会添加这些依赖到 classpath,会导致应用运行时抛出 NoClassDefFountError 异常。为了避免声明这些依赖的作用域为 compile (因为我们不推荐这样做), 上文给出的 Java 和 Scala 项目模板使用了一个小技巧:添加了一个 profile,仅当应用程序在 IntelliJ 中运行时该 profile 才会被激活, 然后将依赖作用域设置为 compile ,从而不影响应用 jar 包。
1. 流式接受数据
案例需求
:采用 Netcat 数据源发送数据,使用 Flink 统计每个单词的数量。
package com.sowhat.flink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* flink的流计算的WordCount
*/
object FlinkStreamWordCount {
def main(args: Array[String]): Unit = {
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//修改并行度
streamEnv.setParallelism(1) //默认所有算子的并行度为1
//2、导入隐式转换
import org.apache.flink.streaming.api.scala._
//3、读取数据,读取sock流中的数据
val stream: DataStream[String] = streamEnv.socketTextStream("IP", 8899) //DataStream ==> spark 中Dstream
//4、转换和处理数据
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.map((_, 1)).setParallelism(2)
.keyBy(0) //分组算子 : 0 或者 1 代表下标。前面的DataStream[二元组] , 0代表单词 ,1代表单词出现的次数
.sum(1).setParallelism(2) //聚会累加算子
//5、打印结果
result.print("结果").setParallelism(1)
//6、启动流计算程序
streamEnv.execute("wordcount")
}
}
12345678910111213141516171819202122232425262728293031
找一个服务可以接受到的接口发送若干信息:
$ nc -lk 8899
hadoop spark hive flink
flink sowhat liu spark
flink sowhat
---
结果> (hive,1)
结果> (spark,1)
结果> (hadoop,1)
结果> (flink,1)
结果> (sowhat,1)
结果> (spark,2)
结果> (liu,1)
结果> (flink,2)
结果> (sowhat,2)
结果> (flink,3)
123456789101112131415
PS:如果将代码中所有关于并行度的全部屏蔽掉,系统会自动的将全部CPU利用起来,然后利用Hash算法来将数据归类给不同的CPU核心来处理,结果可能如下:
结果:1> (hive,1) // 表示第几个核给出的结果
结果:4> (flink,1)
结果:1> (spark,1)
结果:4> (sohat,1)
结果:1> (hive,2)
结果:2> (node,1)
结果:4> (zookeeper,1)
结果:3> (manager,1)
12345678
2. 直接统计指定文件WordCount
需求
:读取本地数据文件,统计文件中每个单词出现的次数。 根据需求,很明显是有界流(批计算),所以采用另外一个上下文环境:ExecutionEnvironment
在IDEA的resources
目录下创建个wc.txt
文件内容如下:
hello flink spark
hello spark
spark core flink stream
hello fink
1234
批量统计代码如下:
package com.sowhat.flink
import java.net.{URL, URLDecoder}
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
/**
* Flink的批计算案例
*/
object BatchWordCount {
def main(args: Array[String]): Unit = {
//初始化Flink批处理环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val dataPath: URL = getClass.getResource("/wc.txt") //使用相对路径来得到完整的文件路径
var packagePath: String = dataPath.getPath().replaceAll("%20", ""); //解决路径中含有空格的情况
val str:String = URLDecoder.decode(packagePath, "utf-8"); //解决路径包含中文的情况
println(str)
//读数据
val data: DataSet[String] = env.readTextFile(str) //DataSet ==> spark RDD
//计算并且打印结果
data.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0) //分组算子 : 0 或者 1 代表下标。前面的DataStream[二元组] , 0代表单词 ,1代表单词出现的次数
.sum(1)
.print()
}
}
12345678910111213141516171819202122232425262728293031
3. Flink 的安装和部署
Flink 的安装和部署主要分为本地(单机
)模式和集群
模式,其中本地模式只需直接解压就可以使用,不用修改任何参数,一般在做一些简单测试的时候使用。本地模式不再赘述。集群模式包含:
- Standalone
- Flink on Yarn(重点)
- Mesos
- Docker
- Kubernetes
- AWS
- Goole Compute Engine
目前在企业中使用最多的是 Flink on Yarn 模式。本文主讲Standalone
和Flink on Yarn
这两种模式。
1. 集群基本架构
Flink 整个系统主要由两个组件组成,分别为 JobManager
和 TaskManager
,Flink 架构也遵循Master-Slave
架构设计原则,JobManager 为 Master 节点,TaskManager 为 Worker (Slave)节点。所有组件之间的通信都是借助于 Akka Framework
,包括任务的状态以及 Checkpoint 触发等信息。
1. Client 客户端
客户端负责将任务提交到集群,与 JobManager
构建 Akka
连接,然后将任务提交到 JobManager
,通过和 JobManager
之间进行交互获取任务执行状态。客户端提交任务可以采 用 CLI 方式
或者通过使用 Flink WebUI
提交,也可以在应用程序中指定JobManager
的RPC
网络端口构建 ExecutionEnvironment 提交 Flink 应用。
2.JobManager
JobManager 负责整个 Flink 集群任务的调度以及资源的管理
,从客户端中获取提交的 应用,然后根据集群中 TaskManager 上 TaskSlot 的使用情况,为提交的应用分配相应的 TaskSlots 资源并命令 TaskManger 启动从客户端中获取的应用。JobManager 相当于整个集 群的 Master 节点,且整个集群中有且仅有一个活跃的 JobManager,负责整个集群的任务管 理和资源管理
。JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执 行的情况并通过 Actor System 将应用的任务执行情况发送给客户端。同时在任务执行过程 中,Flink JobManager
会触发 Checkpoints
操作,每个TaskManager
节点收到 Checkpoint 触发指令后,完成 Checkpoint
操作,所有的Checkpoint
协调过程都是在 Flink JobManager
中完成。当任务完成后,Flink 会将任务执行的信息反馈给客户端,并且释放掉 TaskManager
中的资源以供下一次提交任务使用。
3. TaskManager
TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节 点上的资源申请与管理。客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager, 然后 JobManager 会根据已经注册在 JobManager 中 TaskManager 的资源情况,将任务分配给 有资源的 TaskManager 节点,然后启动并运行任务。TaskManager 从 JobManager 接收需要 部署的任务,然后使用 Slot 资源启动 Task,建立数据接入的网络连接,接收数据并开始数 据处理。同时 TaskManager 之间的数据交互都是通过数据流的方式进行的。 可以看出,Flink 的任务运行其实是采用多线程的方式
,这和 MapReduce 多 JVM 进程的 方式有很大的区别Fink 能够极大提高 CPU 使用效率
,在多个任务和 Task 之间通过 TaskSlot
方式共享系统资源,每个 TaskManager 中通过管理多个 TaskSlot 资源池进行对资源进行有 效管理。
PS
:可以认为JobManager类似Hadoop中ApplicationMaster,然后一个机器就是一个TaskManager,一个TaskManager可以分解成若干个Flink基本工作单元TaskSlot
。
2. Standalone 集群安装和部署
Standalone 是 Flink 的独立部署模式,它不依赖其他平台。在使用这种模式搭建 Flink 集群之前,需要先规划集群机器信息。在这里为了搭建一个标准的 Flink 集群,需要准备 3 台 Linux。
- 下载并解压文件到指定目录
- 修改配置文件
进入到 conf 目录下,编辑 flink-conf.yaml 配置文件
jobmanager.rpc.address: hadoop101
# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The heap size for the JobManager JVM
jobmanager.heap.size: 1024m # JobManager 内存大小
# The total process memory size for the TaskManager.
#
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
taskmanager.memory.process.size: 1024m # TaskManager初始化内存大小
# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
#
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 3 # 每一个TaskManager 有几个TaskSlots
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1 # 默认并行度
1234567891011121314151617181920212223242526272829
- 编辑slaves文件
vi slaves
hadoop101
hadoop102
hadoop103
1234
- 信息分发
[root@hadoop101 home]# scp -r flink-1.9.1 root@hadoop102:`pwd`
[root@hadoop101 home]# scp -r flink-1.9.1 root@hadoop103:`pwd`
12
- 主节点启动集群
- WebUI 访问
flink-conf.yaml
的配置文件中rest.port
是WebUI的对外端口,服务器输入hadoop101:8081
即可访问(我这里随便找个别人搭建看的集群看下WebUI)。
左侧栏多点点看看即可,相对来说比较简单。 - 将IDEA代码中的两个Flink核心依赖设置为
provided
然后打包(打包的时候经常性出现问题需检查)通过WebUI上传。
测试结果如下:
[root@hadoop101 home]# nc -lk 8899
12 21 21
12
PS
:敲重点 IDEA中用到的Flink-scala核心依赖要跟服务器集群的核心依赖版本一致
,否则会 报错!
java.lang.NoSuchMethodError: scala.Predef$.refArrayOps
- 命令行提交
命令行提交 flink同样支持两种提交方式,默认不指定就是客户端方式。如果需要使用集群方式提交的话。可以在提交作业的命令行中指定-d或者–detached 进行进群模式提交。
-d,–detached If present, runs the job in detached mode(分离模式)
客户端提交方式:$FLINK_HOME/bin/flink run -c com.daxin.batch.App flinkwordcount.jar
客户端会多出来一个CliFrontend进程,就是驱动进程。
集群模式提交:$FLINK_HOME/bin/flink run -d -c com.daxin.batch.App flinkwordcount.jar
程序提交完毕退出客户端,不再打印作业进度等信息!
1234
- 重要参数说明:下面针对 flink-conf.yaml 文件中的几个重要参数进行分析:
- jobmanager.heap.size:JobManager 节点可用的内存大小。
- taskmanager.heap.size:TaskManager 节点可用的内存大小。
- taskmanager.numberOfTaskSlots:每台机器可用的 Slot 数量。
- parallelism.default:默认情况下 Flink 任务的并行度。
上面参数中所说的 Slot
和 parallelism
的区别:
- Slot 是静态的概念,是指 TaskManager 具有的并发执行能力。
- parallelism 是动态的概念,是指程序运行时实际使用的并发能力。
- 设置合适的 parallelism 能提高运算效率。
- 比如我又4个跑道(Slot ),本次任务我占用2个(parallelism)。 一般情况下Slot
>=
parallelism
3. Flink 提交到 Yarn
Flink on Yarn 模式的原理是依靠 YARN 来调度 Flink 任务
,目前在企业中使用较多。 这种模式的好处是可以充分利用集群资源,提高集群机器的利用率,并且只需要 1 套 Hadoop 集群,就可以执行 MapReduce 和 Spark 任务,还可以执行 Flink 任务等,操作非常方便,不 需要维护多套集群,运维方面也很轻松。Flink on Yarn 模式需要依赖 Hadoop 集群,并且 Hadoop 的版本需要是 2.2 及以上。本文选择的 Hadoop 版本是 2.7.2。
Flink On Yarn 的内部实现原理(Snagit Editor绘制):
- 当启动一个新的 Flink YARN Client 会话时,客户端首先会检查所请求的资源(容器和内存)是否可用。之后,它会上传 Flink 配置和 JAR 文件到 HDFS。
- 客 户 端 的 下 一 步 是 请 求 一 个 YARN 容 器 启 动 ApplicationMaster 。 JobManager 和 ApplicationMaster(AM)运行在同一个容器中,一旦它们成功地启动了,AM 就能够知道 JobManager 的地址,它会为 TaskManager 生成一个新的 Flink 配置文件(这样它才能连 上 JobManager),该文件也同样会被上传到 HDFS。另外,AM 容器还提供了 Flink 的 Web 界面服务。Flink 用来提供服务的端口是由用户和应用程序 ID 作为偏移配置的,这 使得用户能够并行执行多个 YARN 会话。
- 之后,AM 开始为 Flink 的 TaskManager 分配容器(Container),从 HDFS 下载 JAR 文件 和修改过的配置文件。一旦这些步骤完成了,Flink 就安装完成并准备接受任务了
Flink on Yarn 模式在使用的时候又可以分为两种
:
第 1 种模式(Session-Cluster):
是在 YARN 中提前
初始化一个 Flink 集群(称为 Flink yarn-session),开辟指定的资源,以后的 Flink 任务都提交到这里。这个 Flink 集群会常驻
在 YARN 集群中,除非手工停止。这种方式创建的 Flink 集群会独占资源
,不管有没有 Flink 任务在执行,YARN 上面的其他任务都无法使用这些资源。一般此种方式用的较少。
第 2 种模式(Per-Job-Cluster):
每次提交 Flink 任务都会创建一个新的 Flink 集群, 每个 Flink 任务之间相互独立、互不影响,管理方便。任务执行完成之后创建的 Flink 集群也会消失,不会额外占用资源,按需使用,这使资源利用率达到最大,在工作中推荐使用这种模式。注意
:Flink on Yarn 还需要两个先决条件:
- 配置 Hadoop 的环境变量
- 下载 Flink 提交到 Hadoop 的连接器(jar 包 大约40M),并把 jar 拷贝到 Flink 的 lib 目录下
[root@hadoop101 flink-1.9.1]# cp /home/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar lib/
1
启动第一种 Session-Cluster 模式(yarn-session)
1 先启动 Hadoop 集群,然后通过命令启动一个 Flink 的 yarn-session 集群:
[root@hadoop101 flink-1.9.1]# bin/yarn-session.sh -n 3 -s 3 -nm sowhat -d
1
其中 yarn-session.sh 后面支持多个参数。下面针对一些常见的参数进行讲解:
- -n、–container 表示分配容器的数量(也就是 TaskManager 的数量)。
- -D 动态属性。
- -d、–detached 在后台独立运行。
- -jm、–jobManagerMemory :设置 JobManager 的内存,单位是 MB。
- -nm、–name:在 YARN 上为一个自定义的应用设置一个名字。
- -q、–query:显示 YARN 中可用的资源(内存、cpu 核数)。
- -qu、–queue :指定 YARN 队列。
- -s、–slots :每个 TaskManager 使用的 Slot 数量。
- -tm、–taskManagerMemory :每个 TaskManager 的内存,单位是 MB。
- -z、–zookeeperNamespace :针对 HA 模式在 ZooKeeper 上创建 NameSpace。
- -id、–applicationId :指定 YARN 集群上的任务 ID,附着到一个后台独 立运行的 yarn session 中。
查看 WebUI: 由于还没有提交 Flink job,所以都是 0。
这个时候注意查看本地文件系统中有一个临时文件。有了这个文件可以提交 job 到 Yarn
提交 Job : 由于有了之前的配置,所以自动会提交到 Yarn 中。
bin/flink run -c com.bjsxt.flink.StreamWordCount /home/Flink-Demo-1.0-SNAPSHOT.jar
1
至此第一种模式全部完成。
启动第二种模式
这种模式下不需要先启动 yarn-session。所以我们可以把前面启动的 yarn-session 集 群先停止,停止的命令是:
yarn application -kill application_1576832892572_0002 //其中 application_1576832892572_0002 是ID
1
确保 Hadoop 集群是健康的情况下直接提交 Job 命令:
bin/flink run -m yarn-cluster -yn 3 -ys 3 -ynm sowhat02 \
-c com.sowhat.flink.StreamWordCount /home/Flink-Demo-1.0-SNAPSHOT.jar
12
可以看到一个全新的 yarn-session
任务提交参数讲解:相对于 Yarn-Session 参数而言,只是前面加了 y。
- -yn、–container 表示分配容器的数量,也就是 TaskManager 的数量。
- -d、–detached:设置在后台运行。
- -yjm、–jobManagerMemory:设置 JobManager 的内存,单位是 MB。
- -ytm、–taskManagerMemory:设置每个 TaskManager 的内存,单位是 MB。
- -ynm、–name:给当前 Flink application 在 Yarn 上指定名称。
- -yq、–query:显示 yarn 中可用的资源(内存、cpu 核数)
- -yqu、–queue :指定 yarn 资源队列
- -ys、–slots :每个 TaskManager 使用的 Slot 数量。
- -yz、–zookeeperNamespace:针对 HA 模式在 Zookeeper 上创建 NameSpace
- -yid、–applicationID : 指定 Yarn 集群上的任务 ID,附着到一个后台独 立运行的 Yarn Session 中。
4. Flink 的HA
默认情况下,每个 Flink 集群只有一个 JobManager,这将导致单点故障(SPOF
),如 果这个 JobManager 挂了,则不能提交新的任务,并且运行中的程序也会失败。使用 JobManager HA,集群可以从 JobManager 故障中恢复,从而避免单点故障。用户可以在 Standalone
或Flink on Yarn
集群模式下配置 Flink 集群 HA(高可用性)。
Standalone HA
Standalone 模式下,JobManager 的高可用性的基本思想是,任何时候都有一个 Alive JobManager 和多个 Standby JobManager。Standby JobManager 可以在 Alive JobManager 挂掉的情况下接管集群成为 Alive JobManager,这样避免了单点故障,一旦某一个 Standby JobManager 接管集群,程序就可以继续运行。Standby JobManagers 和 Alive JobManager 实例之间没有明确区别,每个 JobManager 都可以成为 Alive 或 Standby
Flink Standalone 集群的 HA 安装和配置
实现 HA 还需要依赖 ZooKeeper 和 HDFS,因此要有一个 ZooKeeper 集群和 Hadoop 集群, 首先启动 Zookeeper 集群和 HDFS 集群。本文中分配 3 台 JobManager,如下表:
hadoop101 | hadoop102 | hadoop103 |
---|---|---|
JobManager | JobManager | JobManager |
TaskManager | TaskManager | TaskManager |
- 修改配置文件 conf/masters
hadoop101:8081
hadoop102:8081
hadoop103:8081
123
- 修改配置文件 conf/flink-conf.yaml
#要启用高可用,设置修改为zookeeper
high-availability: zookeeper
#Zookeeper的主机名和端口信息,多个参数之间用逗号隔开
high-availability.zookeeper.quorum: hadoop103:2181,hadoop101:2181,hadoop102:2181
# 建议指定HDFS的全路径。如果某个Flink节点没有配置HDFS的话,不指定HDFS的全路径 则无法识到,
# storageDir存储了恢复一个JobManager所需的所有元数据。
high-availability.storageDir: hdfs://hadoop101:9000/flink/h
1234567
- 把修改的配置文件拷贝其他服务器中
[root@hadoop101 conf]# scp masters flink-conf.yaml root@hadoop102:`pwd`
[root@hadoop101 conf]# scp masters flink-conf.yaml root@hadoop103:`pwd`
12
- 启动集群
版本问题:目前使用 Flink1.7.1 版本测试没有问题,使用 Flink1.9 版本存在 HA 界面不能自动跳转到对应的 Alive jobManager。
Flink On Yarn HA
正常基于 Yarn 提交 Flink 程序,无论是使用 yarn-session
模式还是 yarn-cluster
模 式 , 基 于 yarn 运 行 后 的 application 只 要 kill 掉 对 应 的 Flink 集 群 进 程YarnSessionClusterEntrypoint
后,基于 Yarn 的 Flink 任务就失败了,不会
自动进行重试,所以基于 Yarn 运行 Flink 任务,也有必要搭建 HA,这里同样还是需要借助 zookeeper 来完成,步骤如下:
- 修改所有 Hadoop 节点的 yarn-site.xml 将所有 Hadoop 节点的 yarn-site.xml 中的提交应用程序最大尝试次数调大
#在每台hadoop节点yarn-site.xml中设置提交应用程序的最大尝试次数,建议不低于4,
# 这里重试指的ApplicationMaster
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
</property>
123456
- 启动 zookeeper,启动 Hadoop 集群
- 修改 Flink 对应 flink-conf.yaml 配置,配置内容如下:
#配置依赖zookeeper模式进行HA搭建
high-availability: zookeeper
#配置JobManager原数据存储路径 high-availability.storageDir: hdfs://hadoop101:9000/flink/yarnha/
#配置zookeeper集群节点
high-availability.zookeeper.quorum: hadoop101:2181,hadoop102:2181,hadoop103:2181
#yarn停止一个application重试的次数
yarn.application-attempts: 10
1234567
- 启动 yarn-session.sh 测试 HA: yarn-session.sh -n 2 ,也可以直接提交 Job 启动之后,可以登录 yarn 中对应的 flink WebUI,如下图示:
- 点击对应的 Tracking UI,进入 Flink 集群 UI
查看对应的 JobManager 在哪台节点上启动:
进入对应的节点,kill 掉对应的YarnSessionClusterEntrypoint
进程。然后进入到 Yarn 中观察applicationxxxx_0001
job 信息:
点击 job ID,发现会有对应的重试信息:
点击对应的Tracking U
进入到 Flink 集群 UI,查看新的 JobManager 节点由原来的 hadoop103 变成了 hadoop101,说明 HA 起作用。
4. Flink 并行度和 Slot
Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程(Solt) 上执行一个或多个 subtask。Flink 的每个 TaskManager 为集群提供 Solt。Solt 的数量通常 与每个 TaskManager 节点的可用 CPU 内核数成比例,一般情况下 Slot 的数量就是每个节点 的 CPU 的核数
。 Slot 的 数 量 由 集 群 中 flink-conf.yaml 配 置 文 件 中 设 置 taskmanager.numberOfTaskSlots,这个值的大小建议
和节点 CPU 的数量保持一致。比如我设置=3。并行度
=2的情况下:
注意一点:一个TaskSlot可能执行多个job。
一个任务的并行度设置可以从 4 个层面指定:
- Operator Level(算子层面)。
- Execution Environment Level(执行环境层面)。
- Client Level(客户端层面)。
- System Level(系统层面)。
这 些 并 行 度 的 优 先 级 为 :
Operator Level >
Execution Environment Level >
Client Level >
System Level。
1. 并行度设置之 Operator Level
Operator、Source 和 Sink 目的地的并行度可以通过调用 setParallelism()
方法来指定
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.map((_, 1)).setParallelism(2)
.keyBy(0) //分组算子 : 0 或者 1 代表下标。前面的DataStream[二元组] , 0代表单词 ,1代表单词出现的次数
.sum(1).setParallelism(2) //聚会累加算子
//5、打印结果
result.print("结果").setParallelism(1)
123456
2. 并行度设置之 Execution Environment Level
任务的默认并行度可以通过调用 setParallelism()
方法指定。为了以并行度 3 来执行 所有
的 Operator、Source 和 Sink,可以通过如下方式设置执行环境的并行度
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//修改并行度
streamEnv.setParallelism(3) //默认所有算子的并行度为3
1234
3. 并行度设置之 Client Level
并行度还可以在客户端提交 Job 到 Flink 时设定。对于 CLI 客户端,可以通过-p 参数指定并行度。
bin/flink run -p 10 WordCount.jar
1
4. 并行度设置之 System Level
在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所 有执行环境的默认并行度。
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
12
5. 并行度案例分析
Flink 集群中有 3 个 TaskManager 节点,每个 TaskManager 的 Slot 数量为 3
全部默认情况下:
全局并行度=2End
:牢记并行度设置的优先级,根据集群配置合理设置参数。
4. Flink 常用API详解
1. 函数阶层
Flink 根据抽象程度分层,提供了三种不同的 API 和库。每一种 API 在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景。
ProcessFunction
ProcessFunction 是 Flink 所提供最底层接口
。ProcessFunction 可以处理一或两条 输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的 某一时刻触发回调函数。因此,你可以利用 ProcessFunction 实现许多有状态的事件 驱动应用所需要的基于单个事件的复杂业务逻辑。DataStream API
DataStream API 为许多通用的流处理操作提供了处理原语。这些操作包括窗口、逐条 记录的转换操作,在处理事件时进行外部数据库查询等。DataStream API 支持 Java 和 Scala 语言,预先定义了例如map()
、reduce()
、aggregate()
等函数。你可以通过扩 展实现预定义接口或使用 Java、Scala 的 lambda 表达式实现自定义的函数。SQL & Table API
:
Flink 支持两种关系型的 API,Table API 和 SQL。这两个 API 都是批处理
和流处理
统一的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以相同的语义执行查询,并产生相同的结果。Table API 和 SQL 借助了 Apache Calcite 来进行查询的解析,校验以及优化。它们可以与 DataStream 和 DataSet API 无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。
另外 Flink 具有数个适用于常见数据处理应用场景的扩展库。
复杂事件处理(CEP)
:
模式检测是事件流处理中的一个非常常见的用例。Flink 的 CEP 库提供了 API,使用户能够以例如正则表达式或状态机的方式指定事件模式。CEP 库与 Flink 的 DataStream API 集成,以便在 DataStream 上评估模式。CEP 库的应用包括 网络入侵检测,业务流程监控和欺诈检测。DataSet API
:
DataSet API 是 Flink 用于批处理
应用程序的核心 API。DataSet API 所提供的基础算子包括 map、reduce、(outer) join、co-group、iterate 等。所有算子都有相应的算法和数据结构支持,对内存中的序列化数据进行操作。如果数据大小超过预留内存,则过量数据将存储到磁盘。Flink 的 DataSet API 的数据处理算法借鉴了传统数据库算法的实现,例如混合散列连接(hybrid hash-join)和外部归并排序 (external merge-sort)。Gelly
:
Gelly 是一个可扩展的图形处理和分析库。Gelly 是在 DataSet API 之上实现 的,并与 DataSet API 集成。因此,它能够受益于其可扩展且健壮的操作符。Gelly 提 供了内置算法,如 label propagation、triangle enumeration 和 PageRank 算法, 也提供了一个简化自定义图算法实现的 Graph API。
2. DataStream 的编程模型
DataStream 的编程模型包括四
个部分:Environment
,DataSource
,Transformation
,Sink
。此乃重点
,接下来主要按照这四部分讲解。
3. Flink 的 DataSource 数据源
基于文件、基于集合、基于Kafka、自定义的DataSource
1. 基于文件的Source
读取本地文件系统的数据,前面的案例已经讲过了。本课程主要讲基于HDFS
文件系统的 Source。首先需要配置 Hadoop 的依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
12345678910
代码:
package com.sowhat.flink.source
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object HDFSFileSource {
def main(args: Array[String]): Unit = {
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//修改并行度
streamEnv.setParallelism(1) //默认所有算子的并行度为1
//2、导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取HDFS文件系统上的文件
val stream: DataStream[String] = streamEnv.readTextFile("hdfs://hadoop101:9000/wc.txt")
//单词统计的计算
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
//定义sink
result.print()
streamEnv.execute("wordcount")
}
}
123456789101112131415161718192021222324252627282930
2. 基于集合的Source
package com.sowhat.flink.source
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 基站日志
* @param sid 基站的id
* @param callOut 主叫号码
* @param callInt 被叫号码
* @param callType 呼叫类型
* @param callTime 呼叫时间 (毫秒)
* @param duration 通话时长 (秒)
*/
case class StationLog(sid: String, var callOut: String, var callInt: String, callType: String, callTime: Long, duration: Long)
object CollectionSource {
def main(args: Array[String]): Unit = {
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//修改并行度
streamEnv.setParallelism(1) //默认所有算子的并行度为1
//2、导入隐式转换
import org.apache.flink.streaming.api.scala._
val stream: DataStream[StationLog] = streamEnv.fromCollection(Array(
new StationLog("001", "1866", "189", "busy", System.currentTimeMillis(), 0),
new StationLog("002", "1866", "188", "busy", System.currentTimeMillis(), 0),
new StationLog("004", "1876", "183", "busy", System.currentTimeMillis(), 0),
new StationLog("005", "1856", "186", "success", System.currentTimeMillis(), 20)
))
stream.print()
streamEnv.execute()
}
}
1234567891011121314151617181920212223242526272829303132333435
3. 基于Kafka的Source
首 先 需 要 配 置 Kafka 连 接 器 的 依 赖 , 另 外 更 多 的 连 接 器 可 以 查 看 官 网 :连接器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
12345678910
关于Kafka的demo参考 文章
1. 消费普通String
Kafka生产者:
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic sowhat
>hello world
>sowhat
1234
消费者
package com.sowhat.flink.source
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaSource1 {
def main(args: Array[String]): Unit = {
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//修改并行度
streamEnv.setParallelism(1) //默认所有算子的并行度为1
//2、导入隐式转换
import org.apache.flink.streaming.api.scala._
//连接Kafka,并且Kafka中的数据是普通字符串(String)
val props = new Properties()
// 链接的Kafka 集群
props.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092")
// 指定组名
props.setProperty("group.id", "fink01")
// 指定KV序列化类
props.setProperty("key.deserializer", classOf[StringDeserializer].getName)
props.setProperty("value.deserializer", classOf[StringDeserializer].getName)
// 从最新数据开始读
props.setProperty("auto.offset.reset", "latest")
// 订阅主题
val stream: DataStream[String] = streamEnv.addSource(new FlinkKafkaConsumer[String]("sowhat", new SimpleStringSchema(), props))
stream.print()
streamEnv.execute()
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041
2. 消费KV形式
Kafka模式就是输入的KV只是平常只用V而已,如果用消费者KV则我们需要代码编写生产者跟消费者。
生产者:
package com.sowhat.flink.source
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
object MyKafkaProducer {
def main(args: Array[String]): Unit = {
//连接Kafka的属性
val props = new Properties()
// 链接的集群
props.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092")
// 序列化KV类
props.setProperty("key.serializer", classOf[StringSerializer].getName)
props.setProperty("value.serializer", classOf[StringSerializer].getName)
var producer = new KafkaProducer[String, String](props)
var r = new Random()
while (true) { //死循环生成键值对的数据
val data = new ProducerRecord[String, String]("sowhat", "key" + r.nextInt(10), "value" + r.nextInt(100))
producer.send(data)
Thread.sleep(1000)
}
producer.close()
}
}
123456789101112131415161718192021222324252627282930
消费者:
package com.sowhat.flink.source
import java.util.Properties
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
//2、导入隐式转换
import org.apache.flink.streaming.api.scala._
object KafkaSourceByKeyValue {
def main(args: Array[String]): Unit = {
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//修改并行度
streamEnv.setParallelism(1) //默认所有算子的并行度为1
//连接Kafka的属性
val props = new Properties()
props.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092")
props.setProperty("group.id", "flink002")
props.setProperty("key.deserializer", classOf[StringDeserializer].getName)
props.setProperty("value.deserializer", classOf[StringDeserializer].getName)
props.setProperty("auto.offset.reset", "latest")
//设置Kafka数据源
val stream: DataStream[(String, String)] = streamEnv.addSource(new FlinkKafkaConsumer[(String, String)]("sowhat", new MyKafkaReader, props))
stream.print()
streamEnv.execute()
}
//自定义一个类,从Kafka中读取键值对的数据
class MyKafkaReader extends KafkaDeserializationSchema[(String, String)] {
//是否流结束
override def isEndOfStream(nextElement: (String, String)): Boolean = {
false
}
// 反序列化
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
if (record != null) {
var key = "null"
var value = "null"
if (record.key() != null) {
key = new String(record.key(), "UTF-8")
}
if (record.value() != null) { //从Kafka记录中得到Value
value = new String(record.value(), "UTF-8")
}
(key, value)
} else {
//数据为空
("null", "null")
}
}
//指定类型
override def getProducedType: TypeInformation[(String, String)] = {
createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
}
}
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
4. 自定义Source
当然也可以自定义数据源,有两种
方式实现:
- 通过实现
SourceFunction
接口来自定义无并行度(也就是并行度只能为 1)的 Source。 - 通过实现
ParallelSourceFunction
接口或者继承RichParallelSourceFunction
来自 定义有并行度的数据源。
package com.sowhat.flink.source
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import scala.util.Random
case class StationLog(sid: String, var callOut: String, var callInt: String, callType: String, callTime: Long, duration: Long)
/**
* 自定义的Source,需求:每隔两秒钟,生成10条随机基站通话日志数据
*/
class MyCustomerSource extends SourceFunction[StationLog] {
//是否终止数据流的标记
var flag = true;
/**
* 主要的方法,启动一个Source,并且从Source中返回数据
* 如果run方法停止,则数据流终止
*/
override def run(ctx: SourceFunction.SourceContext[StationLog]): Unit = {
val r = new Random()
var types = Array("fail", "basy", "barring", "success")
while (flag) {
1.to(10).map(_ => {
var callOut = "1860000%04d".format(r.nextInt(10000)) //主叫号码
var callIn = "1890000%04d".format(r.nextInt(10000)) //被叫号码
//生成一条数据
new StationLog("station_" + r.nextInt(10), callOut, callIn, types(r.nextInt(4)), System.currentTimeMillis(), r.nextInt(20))
}).foreach(ctx.collect(_)) //发送数据到流
Thread.sleep(2000) //每隔2秒发送一次数据
}
}
//终止数据流
override def cancel(): Unit = {
flag = false;
}
}
object CustomerSource {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
val stream: DataStream[StationLog] = streamEnv.addSource(new MyCustomerSource)
stream.print()
streamEnv.execute("SelfSource")
}
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
4. Flink 的 Sink 数据目标
Flink 针对 DataStream 提供了大量的已经实现的数据目标
(Sink),包括文件
、Kafka
、Redis
、HDFS
、Elasticsearch
等等。
1. 基于 HDFS 的 Sink
首先配置支持 Hadoop FileSystem 的连接器依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.9.1</version>
</dependency>
12345
Streaming File Sink 能把数据写入 HDFS 中,还可以支持分桶写入
,每一个 分桶 就对 应 HDFS 中的一个目录。默认按照小时来分桶,在一个桶内部,会进一步将输出基于滚动策 略切分成更小的文件。这有助于防止桶文件变得过大。滚动策略也是可以配置的,默认策略会根据文件大小和超时时间来滚动文件,超时时间是指没有新数据写入部分文件(part file)的时间。
需求
:把自定义的Source作为数据源,把基站日志数据 写入HDFS 并且每隔10秒钟生成一个文件
package com.sowhat.flink.sink
import com.sowhat.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
object HDFSSink {
//需求:把自定义的Source作为数据源,把基站日志数据写入HDFS并且每隔10钟生成一个文件
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//读取数据源
val stream: DataStream[StationLog] = streamEnv.addSource(new MyCustomerSource)
//默认一个小时一个目录(分桶)
//设置一个滚动策略
val rolling: DefaultRollingPolicy[StationLog, String] = DefaultRollingPolicy.create()
.withInactivityInterval(5000) //不活动的分桶时间
.withRolloverInterval(10000) //每隔10 生成一个文件
.build() //创建
//创建HDFS的Sink
val hdfsSink: StreamingFileSink[StationLog] = StreamingFileSink.forRowFormat[StationLog](
new Path("hdfs://hadoop101:9000/MySink001/"),
new SimpleStringEncoder[StationLog]("UTF-8"))
.withRollingPolicy(rolling)
.withBucketCheckInterval(1000) //检查间隔时间
.build()
stream.addSink(hdfsSink)
streamEnv.execute()
}
}
12345678910111213141516171819202122232425262728293031323334353637383940
2. 基于 Redis的 Sink
Flink 除了内置的 连接器 外,还有一些额外的连接器通过 Apache Bahir 发布,包括:
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
- Netty (source)
这里我用 Redis 来举例,首先需要配置 Redis 连接器的依赖:需求
:把netcat作为数据源,并且统计每个单词的次数,统计的结果写入Redis数据库中。
导入依赖:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
12345
代码如下:
package com.sowhat.flink.sink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object RedisSink {
//需求:把netcat作为数据源,并且统计每个单词的次数,统计的结果写入Redis数据库中。
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
import org.apache.flink.streaming.api.scala._
//读取数据源
val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101", 8888)
//计算
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0) // 等价于groupbyKey
.sum(1)
//把结果写入Redis中 设置连接Redis的配置
val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("hadoop101").setPort(6379).build()
//设置Redis的Sink
result.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] {
//设置redis的命令
override def getCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sowhat")
// https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
}
//从数据中获取Key
override def getKeyFromData(data: (String, Int)) = {
data._1
}
//从数据中获取Value
override def getValueFromData(data: (String, Int)) = {
data._2 + ""
}
}))
streamEnv.execute("redisSink")
}
}
12345678910111213141516171819202122232425262728293031323334353637383940414243
3. 基于 Kafka的 Sink
由于前面有的课程已经讲过 Flink 的 Kafka 连接器,所以还是一样需要配置 Kafka 连接 器的依赖配置,接下我们还是把 WordCout 的结果写入 Kafka:
1. Kafka作为Sink的第一种(String)
需求
:把netcat数据源中每个单词写入Kafka
package com.sowhat.flink.sink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
object KafkaSinkByString {
//Kafka作为Sink的第一种(String)
//需求:把netcat数据源中每个单词写入Kafka
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
streamEnv.setParallelism(1)
//读取数据源
val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101",8888)
//计算
val words: DataStream[String] = stream.flatMap(_.split(" "))
//把单词写入Kafka
words.addSink(new FlinkKafkaProducer[String]("hadoop101:9092,hadoop102:9092,hadoop103:9092","sowhat",new SimpleStringSchema()))
streamEnv.execute("kafkaSink")
}
}
12345678910111213141516171819202122
写入到Kafka后可以在终端开一个消费者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic sowhat
1
2. Kafka作为Sink的第二种(KV)
需求
:把netcat
作为数据源,统计每个单词的数量,并且把统计的结果写入Kafka
package com.sowhat.flink.sink
import java.lang
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
object KafkaSinkByKeyValue {
//Kafka作为Sink的第二种(KV)
//把netcat作为数据源,统计每个单词的数量,并且把统计的结果写入Kafka
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._;
streamEnv.setParallelism(1)
//读取数据源
val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101", 8888)
//计算
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
//创建连接Kafka的属性
var props = new Properties()
props.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092")
//创建一个Kafka的sink
var kafkaSink = new FlinkKafkaProducer[(String, Int)](
"sowhat",
new KafkaSerializationSchema[(String, Int)] { //自定义的匿名内部类
override def serialize(element: (String, Int), timestamp: lang.Long) = {
new ProducerRecord("sowhat", element._1.getBytes, (element._2 + "").getBytes)
}
},
props, //连接Kafka的数学
FlinkKafkaProducer.Semantic.EXACTLY_ONCE //精确一次
)
result.addSink(kafkaSink)
streamEnv.execute("kafka的sink的第二种")
//--property print.key=true Kafka的命令加一个参数
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041
生成写入KV后可以定义消费者:
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning \
--topic sowhat --property print.key=true
Kafka的命令加一个参数
1234
4. 基于HBase的Sink
引入依赖:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hbase -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.12</artifactId>
<version>1.10.0</version>
</dependency>
123456
代码:
packge com.sowhat.demo
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
class HBaseWriter extends RichSinkFunction[String] {
var conn: Connection = null
val scan: Scan = null
var mutator: BufferedMutator = null
var count:Int = 0
override def open(parameters: Configuration): Unit = {
val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create
config.set(HConstants.ZOOKEEPER_QUORUM, "IP1,IP2,IP3")
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
conn = ConnectionFactory.createConnection(config)
val tableName: TableName = TableName.valueOf("sowhat")
val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
//设置缓存1m,当达到1m时数据会自动刷到hbase
params.writeBufferSize(100)
mutator = conn.getBufferedMutator(params)
count = 0
}
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
val cf1 = "m"
val value1 = value.replace(" ", "")
val put: Put = new Put(Bytes.toBytes("rk" + value1))
put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("time"), Bytes.toBytes("v" + value1))
mutator.mutate(put)
//每满2000条刷新一下数据
if (count >= 10) {
mutator.flush()
count = 0
}
count = count + 1
}
/**
* 关闭
*/
override def close(): Unit = {
if (conn != null) conn.close()
}
}
---
package com.sowhat.demo
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
object HbaseRw {
def main(args: Array[String]): Unit = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "10.100.34.111:9092,10.100.34.133:9092")
properties.setProperty("group.id", "timer.hbase")
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("sowhat", new SimpleStringSchema(), properties))
stream.addSink(new HBaseWriter)
env.execute("hbase write")
}
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
5. 自定义 的 Sink
当然你可以自己定义 Sink,有两种实现方式:
1、实现 SinkFunction
接口。
2、实现 RichSinkFunction
类。后者增加了生命周期的管理功能。比如需要在 Sink 初始化的时候创 建连接对象,则最好使用第二种。需求
:随机生成StationLog对象,写入MySQL数据库的表t_station_log
中
引入依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
12345
代码如下:
package com.sowhat.flink.sink
import java.sql.{Connection, DriverManager, PreparedStatement}
import com.sowhat.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
case class StationLog(sid: String, var callOut: String, var callInt: String, callType: String, callTime: Long, duration: Long)
object CustomerJdbcSink {
//需求:随机生成StationLog对象,写入Mysql数据库的表(t_station_log)中
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
streamEnv.setParallelism(1)
val stream: DataStream[StationLog] = streamEnv.addSource(new MyCustomerSource)
//数据写入Mysql,所有需要创建一个自定义的sink
stream.addSink(new MyCustomerJdbcSink)
streamEnv.execute("jdbcSink")
}
/**
* 自定义的Sink类
*/
class MyCustomerJdbcSink extends RichSinkFunction[StationLog]{
var conn :Connection=_
var pst :PreparedStatement=_
//把StationLog对象写入Mysql表中,每写入一条执行一次
override def invoke(value: StationLog, context: SinkFunction.Context[_]): Unit = {
pst.setString(1,value.sid)
pst.setString(2,value.callOut)
pst.setString(3,value.callInt)
pst.setString(4,value.callType)
pst.setLong(5,value.callTime)
pst.setLong(6,value.duration)
pst.executeUpdate()
}
//Sink初始化的时候调用一次,一个并行度初始化一次
//创建连接对象,和Statement对象
override def open(parameters: Configuration): Unit = {
conn =DriverManager.getConnection("jdbc:mysql://localhost/test","root","123123")
pst =conn.prepareStatement("insert into t_station_log (sid,call_out,call_in,call_type,call_time,duration) values (?,?,?,?,?,?)")
}
override def close(): Unit = {
pst.close()
conn.close()
}
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
5. DataStream转换算子
此时再将中间的转换算子Transformation
,即通过从一个或多个 DataStream 生成新的 DataStream 的过程被称为 Transformation 操作。在转换过程中,每种操作类型被定义为不同的 Operator,Flink 程序能够将多个 Transformation 组成一个 DataFlow 的拓扑。
1. Map [DataStream->DataStream]
调 用 用 户 定 义 的 MapFunction 对 DataStream[T] 数 据 进 行 处 理 , 形 成 新 的 DataStream[T],其中数据格式可能会发生变化,常用作对数据集内数据的清洗
和转换
。例如将输入数据集中的每个数值全部加 1 处理,并且将数据输出到下游数据集。
2. FlatMap [DataStream->DataStream]
该算子主要应用处理输入一个元素产生一个或者多个元素的计算场景,比较常见的是在 经典例子 WordCount 中,将每一行的文本数据切割,生成单词序列如在图所示,对于输入 DataStream[String]通过 FlatMap 函数进行处理,字符串数字按逗号切割,然后形成新的整 数数据集。
val resultStream[String] = dataStream.flatMap { str => str.split(" ") }
1
3. Filter [DataStream->DataStream]
该算子将按照条件对输入数据集进行筛选操作,将符合条件(过滤表达式=true)的数据集输出,将不符合条件的数据过滤掉。如下图所示将输入数据集中偶数过滤出来,奇数从数据集中去除。
val filter:DataStream[Int] = dataStream.filter { _ % 2 == 0 }
1
4. KeyBy [DataStream->KeyedStream]
该算子根据指定的 Key 将输入的 DataStream[T]数据格式转换为 KeyedStream[T],也就是在数据集中执行 Partition 操作,将相同的 Key 值的数据放置在相同的分区中。
默认是根据注定数据的hashcode
来分的。
val test: DataStream[(String, Int)] = streamEnv.fromElements(("1", 5), ("2", 2), ("2", 4), ("1", 3))
val value: KeyedStream[(String, Int), String] = test.keyBy(_._1)
/**
* (String,Int) => 是进行keyBy的数据类型
* String => 是分流的key的数据类型
*/
---
val test: DataStream[(String, Int)] = streamEnv.fromElements(("1", 5), ("2", 2), ("2", 4), ("1", 3))
val value: KeyedStream[(String, Int), Tuple] = test.keyBy(0)
/**
* (String,Int) => 是进行keyBy的数据类型
* Tuple => 是分流的key的数据类型
*/
12345678910111213
5. Reduce [KeyedStream->DataStream]
该算子和 MapReduce 中 Reduce 原理基本一致,主要目的是将输入的KeyedStream
通过 传 入 的 用 户 自 定 义 的 ReduceFunction
滚 动 地 进 行 数 据 聚 合 处 理 , 其 中 定 义 的 ReduceFunciton 必须满足运算结合律
和交换律
。如下代码对传入 keyedStream 数据集中相同的 key 值的数据独立进行求和运算,得到每个 key 所对应的求和值。
val test: DataStream[(String, Int)] = streamEnv.fromElements(("a", 3), ("d", 4), ("c", 2), ("c", 5), ("a", 5))
val value: KeyedStream[(String, Int), Tuple] = test.keyBy(0)
// 滚动对第二个字段进行reduce相加求和
val reduceStream: DataStream[(String, Int)] = value.reduce { (t1, t2) => (t1._1, t1._2 + t2._2) }
1234
结果:
2> (c,2)
3> (a,3)
3> (d,4)
2> (c,7)
3> (a,8)
12345
PS
:对于该结果需要说明下为什么key相同的出现了多次,这主要是Flink流式处理思想的体现,迭代式的输出结果。
6. Aggregations[KeyedStream->DataStream]
Aggregations 是 KeyedDataStream 接口提供的聚合算子,根据指定的字段进行聚合操 作,滚动地产生一系列数据聚合结果。其实是将 Reduce 算子中的函数进行了封装
,封装的 聚合操作有 sum、min、minBy、max、maxBy等,这样就不需要用户自己定义 Reduce 函数。 如下代码所示,指定数据集中第一个字段作为 key,用第二个字段作为累加字段,然后滚动
地对第二个字段的数值进行累加并输出
streamEnv.setParallelism(1)
val test: DataStream[(String, Int)] = streamEnv.fromElements(("a", 3), ("d", 4), ("c", 2), ("c", 5), ("a", 5))
val value: KeyedStream[(String, Int), Tuple] = test.keyBy(0)
// 滚动对第二个字段进行reduce相加求和
val reduceStream: DataStream[(String, Int)] = value.reduce { (t1, t2) => (t1._1, t1._2 + t2._2) }
// 相当于reduce更简化版的 聚合
val sumStream: DataStream[(String, Int)] = value.sum(1)
1234567
结果:
(a,3)
(d,4)
(c,2)
(c,7)
(a,8)
12345
7. Union[DataStream ->DataStream]
Union 算子主要是将两个或者多个输入的数据集合并成一个数据集,需要保证两个数据 集的格式一致
,输出的数据集的格式和输入的数据集格式保持一致。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TestUnion {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
streamEnv.setParallelism(1)
var stream1 = streamEnv.fromElements(("a", 1), ("b", 2))
var stream2 = streamEnv.fromElements(("b", 5), ("d", 6))
var stream3 = streamEnv.fromElements(("e", 7), ("f", 8))
val result: DataStream[(String, Int)] = stream1.union(stream2, stream3)
result.print()
streamEnv.execute()
}
}
12345678910111213141516
结果:
(a,1)
(b,2)
(e,7)
(f,8)
(b,5)
(d,6)
123456
8. Connect、CoMap、CoFlatMap[DataStream ->ConnectedStream->DataStream]
Connect 算子主要是为了合并
两种或者多种不同数据类型
的数据集,合并后会保留原来 数据集的数据类型。
例如:dataStream1 数据集为(String, Int)元祖类型,dataStream2 数据集为 Int 类型,通过 connect 连接算子将两个不同数据类型的流结合在一起,形成格式 为 ConnectedStreams 的数据集,其内部数据为[(String, Int), Int]
的混合数据类型,保留了两个原始数据集的数据类型。
需要注意的是,对于 ConnectedStreams 类型的数据集不能
直接进行类似 Print()的操 作,需要再转换成 DataStream 类型数据集,在 Flink 中 ConnectedStreams 提供的 map()
方 法和flatMap()
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TestConnect {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
streamEnv.setParallelism(1)
val stream1: DataStream[(String, Int)] = streamEnv.fromElements(("a", 1), ("b", 2), ("c", 3))
val stream2: DataStream[String] = streamEnv.fromElements("e", "f", "g")
val stream3: ConnectedStreams[(String, Int), String] = stream1.connect(stream2) //注意得到ConnectedStreams,实际上里面的数据没有真正合并
//使用CoMap,或者CoFlatmap
val result: DataStream[(String, Int)] = stream3.map(
//第一个处理的函数
t => {
(t._1, t._2)
},
//第二个处理的函数
t => {
(t, 0)
}
)
result.print()
streamEnv.execute()
}
}
123456789101112131415161718192021222324252627
结果:
(e,0)
(f,0)
(g,0)
(a,1)
(b,2)
(c,3)
123456
注意
:
- Union 之前两个流的类型
必须是一样
,Connect可以不一样
,在之后的 coMap 中再去调 整成为一样的。 - Connect
只能
操作两个流,Union可以
操作多个。
9. Split 和 select [DataStream->SplitStream->DataStream]
Split 算子是将一个 DataStream 数据集按照条件进行拆分
,形成两个数据集的过程, 也是 union 算子的逆向实现。每个接入的数据都会被路由
到一个或者多个输出数据集中。Side Output
import com.sowhat.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
object TestSplitAndSelect {
//需求:从自定义的数据源中读取基站通话日志,把通话成功的和通话失败的分离出来
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
streamEnv.setParallelism(1)
//读取数据源
val stream: DataStream[StationLog] = streamEnv.addSource(new MyCustomerSource)
// this needs to be an anonymous inner class, so that we can analyze the type
val successTag = OutputTag[StationLog]("success")
val nosuccessTag = OutputTag[StationLog]("nosuccess")
val sideoutputStream: DataStream[StationLog] = stream.process(new ProcessFunction[StationLog, StationLog] {
override def processElement(value: StationLog, ctx: ProcessFunction[StationLog, StationLog]#Context, out: Collector[StationLog]): Unit = {
if (value.callType.equals("success")) {
ctx.output(successTag, value)
}
else {
ctx.output(nosuccessTag, value)
}
}
})
sideoutputStream.getSideOutput(successTag).print("成功数据")
sideoutputStream.getSideOutput(nosuccessTag).print("未成功数据")
//切割
val splitStream: SplitStream[StationLog] = stream.split( //流并没有真正切割
log => {
if (log.callType.equals("success")) {
Seq("Success")
} else {
Seq("NOSuccess")
}
}
)
//选择不同的流 根据标签得到不同流
val stream1: DataStream[StationLog] = splitStream.select("Success")
val stream2: DataStream[StationLog] = splitStream.select("NOSuccess")
stream.print("原始数据")
stream1.print("通话成功")
stream2.print("通话不成功")
streamEnv.execute()
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
函数类和富函数类
前面学过的所有算子几乎都可以自定义一个函数类、富函数类作为参数。因为 Flink 暴露者两种函数类的接口,常见的函数接口有:
- MapFunction
- FlatMapFunction
- ReduceFunction
- 。。。。。
富函数接口
它其他常规函数接口的不同在于:可以获取运行环境的上下文,在上下文环境中可以管理状态
,并拥有一些生命周期方法,所以可以实现更复杂的功能。富函数的接口有:
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- RichSinkFunction
1. 普通函数类型
普通函数类举例:按照指定的时间格式输出每个通话的拨号时间和结束时间。resources目录下station.log文件内容如下:
station_0,18600003612,18900004575,barring,1577080453123,0
station_9,18600003186,18900002113,success,1577080453123,32
station_3,18600003794,18900009608,success,1577080453123,4
station_1,18600000005,18900007729,fail,1577080453123,0
station_1,18600000005,18900007729,success,1577080603123,349
station_8,18600007461,18900006987,barring,1577080453123,0
station_5,18600009356,18900006066,busy,1577080455129,0
station_4,18600001941,18900003949,busy,1577080455129,0
12345678
代码如下:
package com.sowhat.flink.transformation
import java.net.URLDecoder
import java.text.SimpleDateFormat
import java.util.Date
import com.sowhat.flink.source.StationLog
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TestFunctionClass {
//计算出每个通话成功的日志中呼叫起始和结束时间,并且按照指定的时间格式
//数据源来自本地文件
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取数据源
var filePath = getClass.getResource("/station.log").getPath
filePath = URLDecoder.decode(filePath, "utf-8")
val stream: DataStream[StationLog] = streamEnv.readTextFile(filePath)
.map(line => {
var arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
//定义一个时间格式
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//计算通话成功的起始和结束时间
val result: DataStream[String] = stream.filter(_.callType.equals("success"))
.map(new MyMapFunction(format))
//result.print()
val result1: DataStream[String] = stream.filter(_.callType.equals("success")).map {
x => {
val startTime = x.callTime
val endTime = startTime + x.duration * 1000
"主叫号码:" + x.callOut + ",被叫号码:" + x.callInt + ",呼叫起始时间:" + format.format(new Date(startTime)) + ",呼叫结束时间:" + format.format(new Date(endTime))
}
}
result1.print()
streamEnv.execute()
}
//自定义一个函数类 指定输入 跟输出类型
class MyMapFunction(format: SimpleDateFormat) extends MapFunction[StationLog, String] {
override def map(value: StationLog): String = {
val startTime = value.callTime
val endTime = startTime + value.duration * 1000
"主叫号码:" + value.callOut + ",被叫号码:" + value.callInt + ",呼叫起始时间:" + format.format(new Date(startTime)) + ",呼叫结束时间:" + format.format(new Date(endTime))
}
}
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
2. 富函数类型
富函数类举例
:把呼叫成功的通话信息转化成真实的用户姓名,通话用户对应的用户表 (在 Mysql 数据中)
由于需要从数据库中查询数据,就需要创建连接,创建连接的代码必须写在生命周期的 open 方法中。所以需要使用富函数类。Rich Function
有一个生命周期的概念。典型的生命周期方法有:
- open()方法是 rich function 的
初始化
方法,当一个算子例如 map 或者 filter 被调用 之前 open()会被调用。 - close()方法是生命周期中的最后一个调用的方法,做一些
清理工作
。 - getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的 并行度,任务的名字,以及 state 状态
package com.sowhat.flink.transformation
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import com.sowhat.flink.source.StationLog
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TestRichFunctionClass {
/**
* 把通话成功的电话号码转换成真是用户姓名,用户姓名保存在Mysql表中
* @param args
*/
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取数据源
var filePath = getClass.getResource("/station.log").getPath
val stream: DataStream[StationLog] = streamEnv.readTextFile(filePath)
.map(line => {
var arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
//计算:把电话号码变成用户姓名
val result: DataStream[StationLog] = stream.filter(_.callType.equals("success"))
.map(new MyRichMapFunction)
result.print()
streamEnv.execute()
}
//自定义一个富函数类
class MyRichMapFunction extends RichMapFunction[StationLog, StationLog] {
var conn: Connection = _
var pst: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123123")
pst = conn.prepareStatement("select name from t_phone where phone_number=?")
}
override def close(): Unit = {
pst.close()
conn.close()
}
override def map(value: StationLog): StationLog = {
// 获取上下文信息 比如获取子线程
println(getRuntimeContext.getTaskNameWithSubtasks)
//查询主叫号码对应的姓名
pst.setString(1, value.callOut)
val result: ResultSet = pst.executeQuery()
if (result.next()) {
value.callOut = result.getString(1)
}
//查询被叫号码对应的姓名
pst.setString(1, value.callInt)
val result2: ResultSet = pst.executeQuery()
if (result2.next()) {
value.callInt = result2.getString(1)
}
value
}
}
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
3. 底层 ProcessFunctionAPI
ProcessFunction 是一个低层次的流处理操作,允许返回所有 Stream 的基础构建模块,可以说是Flink的杀手锏
了。
- 访问 Event 本身数据(比如:Event 的时间,Event 的当前 Key 等)
- 管理状态 State(仅在 Keyed Stream 中)
- 管理定时器 Timer(包括:注册定时器,删除定时器等) 总而言之,ProcessFunction 是 Flink 最底层的 API,也是功能最强大的。
需求
:监控每一个手机,如果在 5 秒内呼叫它的通话都是失败的,发出警告信息。注意
: 本demo中会用到状态编程,只要知道状态的意思,不需要掌握。后面的文章中会详细讲解 State 编程。
package com.sowhat.flink.transformation
import com.sowhat.flink.source.StationLog
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
/**
* 监控每一个手机号码,如果这个号码在5秒内,所有呼叫它的日志都是失败的,则发出告警信息
* 如果在5秒内只要有一个呼叫不是fail则不用告警
*/
/**
* 基站日志
* @param sid 基站的id
* @param callOut 主叫号码
* @param callInt 被叫号码
* @param callType 呼叫类型
* @param callTime 呼叫时间 (毫秒)
* @param duration 通话时长 (秒)
*/
case class StationLog(sid: String, var callOut: String, var callInt: String, callType: String, callTime: Long, duration: Long)
object TestProcessFunction {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取数据源 通过 netcat 发送 数据源
val stream: DataStream[StationLog] = streamEnv.socketTextStream("IP1", 8888)
.map(line => {
val arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
// 按照呼入电话分组
val result: DataStream[String] = stream.keyBy(_.callInt)
.process(new MonitorCallFail)
result.print()
streamEnv.execute()
}
//自定义一个底层的类 第一个是key类型,第二个是处理对象类型,第三个是返回类型
class MonitorCallFail extends KeyedProcessFunction[String, StationLog, String] {
//使用一个状态对象记录时间
lazy val timeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time", classOf[Long]))
override def processElement(value: StationLog, ctx: KeyedProcessFunction[String, StationLog, String]#Context, out: Collector[String]): Unit = {
//从状态中取得时间
val time:Long = timeState.value()
if (time == 0 && value.callType.equals("fail")) { //表示第一次发现呼叫失败,记录当前的时间
//获取当前系统时间,并注册定时器
val nowTime:Long = ctx.timerService().currentProcessingTime()
//定时器在5秒后触发
val onTime:Long = nowTime + 5 * 1000L
ctx.timerService().registerProcessingTimeTimer(onTime)
//把触发时间保存到状态中
timeState.update(onTime)
}
if (time != 0 && !value.callType.equals("fail")) { //表示有一次成功的呼叫,必须要删除定时器
ctx.timerService().deleteProcessingTimeTimer(time)
timeState.clear() //清空状态中的时间
}
}
//时间到了,定时器执行,
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext, out: Collector[String]): Unit = {
val warnStr:String = "触发的时间:" + timestamp + " 手机号 :" + ctx.getCurrentKey
out.collect(warnStr)
timeState.clear()
}
}
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
4. 侧输出流 Side Output
在 Flink 处理数据流时,我们经常会遇到这样的情况:在处理一个数据源时,往往需要将该源中的不同类型的数据做分割处理,如果使用 filter 算子对数据源进行筛选分割的话,势必会造成数据流的多次复制
,造成不必要的性能浪费;flink 中的侧输出
就是将数据 流进行分割,而不对流进行复制的一种分流机制。flink 的侧输出的另一个作用就是对延时迟到
的数据进行处理,这样就可以不必丢弃迟到的数据。在后面的文章中会讲到!案例
:根据基站的日志,请把呼叫成功的 Stream(主流)和不成功的 Stream(侧流) 分别输出。
package com.sowhat.flink.transformation
import com.sowhat.flink.source.StationLog
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
object TestSideOutputStream {
import org.apache.flink.streaming.api.scala._
var notSuccessTag: OutputTag[StationLog] = new OutputTag[StationLog]("not_success") //不成功的侧流标签
//把呼叫成功的日志输出到主流,不成功的到侧流
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//读取数据源
var filePath: String = getClass.getResource("/station.log").getPath
val stream: DataStream[StationLog] = streamEnv.readTextFile(filePath)
.map(line => {
var arr: Array[String] = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
val result: DataStream[StationLog] = stream.process(new CreateSideOuputStream(notSuccessTag))
result.print("主流")
//一定要根据主流得到侧流
val sideStream: DataStream[StationLog] = result.getSideOutput(notSuccessTag)
sideStream.print("侧流")
streamEnv.execute()
}
class CreateSideOuputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] {
override def processElement(value: StationLog, ctx: ProcessFunction[StationLog, StationLog]#Context, out: Collector[StationLog]): Unit = {
if (value.callType.equals("success")) {
//输出主流
out.collect(value)
} else {
//输出侧流
ctx.output(tag, value)
}
}
}
}
1234567891011121314151617181920212223242526272829303132333435363738
5. Flink State管理跟恢复
Flink 是一个默认就有状态的分析引擎,前面的 WordCount 案例可以做到单词的数量的累加,其实是因为在内存中保证了每个单词的出现的次数,这些数据其实就是状态数据。但是如果一个 Task 在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义(At -least-once 和 Exactly-once)上来说,Flink 引入了 State
和CheckPoint
。
State
一般指一个具体的 Task/Operator 的状态(Task Slot/ 转换算子),State 数据默认保存在 Java 的堆
内存中。- CheckPoint(可以理解为
CheckPoint是把State数据持久化存储了
)则表示了一个 Flink Job 在一个特定时刻的一份全局状态快照,即包含了所有Task/Operator
的状态。
1. 常用 State
Flink 有两种常见的 State 类型,分别是:
Keyed State
(键控状态)Operator State
(算子状态)
1. Keyed State(键控状态)
Keyed State:顾名思义就是基于 KeyedStream
上的状态,这个状态是跟特定的 Key 绑定的。KeyedStream 流上的每一个 Key,都对应一个 State。Flink 针对 Keyed State 提供了 以下可以保存 State 的数据结构:
ValueState<T>
:
保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。ListState<T>
:
保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上 进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterableget()
获得整个列表。还可以通过update
(List) 覆盖当前的列表。ReducingState<T>
:
保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。AggregatingState<IN, OUT>
:
保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚 合。FoldingState<T, ACC>
:
保留一个单值,表示添加到状态的所有值的聚合。 与 ReducingState 相反,聚合类型可能与添加到状态的元素类型不同。接口与 ListState 类似,但使用 add(T)添加的元素会用指定的 FoldFunction 折叠成聚合值。MapState<UK, UV>
:
维护了一个映射列表。 你可以添加键值对到状态中,也可以获得 反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、 键和值的可迭代视图。
2. Operator State(算子状态)
Operator State 与 Key 无关,而是与Operator
绑定,整个 Operator 只对应一个 State。 比如:Flink 中的 Kafka Connector 就使用了 Operator State,它会在每个 Connector 实例 中,保存该实例消费 Topic 的所有(partition, offset)映射。
3. Keyed State 案例
demo1:监控每一个手机号码,如果这个号码在5秒内,所有呼叫它的日志都是失败的,demo2 需求
:计算每个手机的呼叫间隔时间,单位是毫秒。
package com.sowhat.flink.state
import java.net.{URL, URLDecoder}
import com.sowhat.flink.BatchWordCount.getClass
import com.sowhat.flink.source.StationLog
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
/**
* 基站日志
* @param sid 基站的id
* @param callOut 主叫号码
* @param callInt 被叫号码
* @param callType 呼叫类型
* @param callTime 呼叫时间 (毫秒)
* @param duration 通话时长 (秒)
*/
case class StationLog(sid: String, var callOut: String, var callInt: String, callType: String, callTime: Long, duration: Long)
/**
* 第一种方法的实现
* 统计每个手机的呼叫时间间隔,单位是毫秒
*/
object TestKeyedState1 {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取数据源
val filePath: URL = getClass.getResource("/station.log") //使用相对路径来得到完整的文件路径
val packagePath: String = filePath.getPath().replaceAll("%20", ""); //解决路径中含有空格的情况
val str:String = URLDecoder.decode(packagePath, "utf-8"); //解决路径包含中文的情况
val stream: DataStream[StationLog] = streamEnv.readTextFile(str)
.map(line => {
val arr:Array[String] = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
stream.keyBy(_.callOut) //分组
.flatMap(new CallIntervalFunction)
.print()
streamEnv.execute()
}
//输出的是一个二元组(手机号码,时间间隔)
class CallIntervalFunction extends RichFlatMapFunction[StationLog, (String, Long)] {
//定义一个状态,用于保存前一次呼叫的时间
private var preCallTimeState: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
preCallTimeState = getRuntimeContext.getState(new ValueStateDescriptor[Long]("pre", classOf[Long]))
}
override def flatMap(value: StationLog, out: Collector[(String, Long)]): Unit = {
//从状态中取得前一次呼叫的时间
val preCallTime:Long = preCallTimeState.value()
if (preCallTime == null || preCallTime == 0) { //状态中没有,肯定是第一次呼叫
preCallTimeState.update(value.callTime)
} else { //状态中有数据,则要计算时间间隔
val interval:Long = Math.abs(value.callTime - preCallTime)
out.collect((value.callOut, interval))
}
}
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
结果:
4> (18600003532,7000)
2> (18600003713,0)
1> (18600003502,9000)
1> (18600003502,0)
1> (18600003502,9000)
1> (18600007699,0)
1> (18600000005,150000)
1234567
stationlog.txt文件信息如下:
station_1,18600000005,18900007729,fail,1577080453123,0
station_1,18600000005,18900007729,success,1577080603123,349
station_8,18600007461,18900006987,barring,1577080453123,0
station_5,18600009356,18900006066,busy,1577080455129,0
station_4,18600001941,18900003949,busy,1577080455129,0
...自己造数据即可
123456
还有第二种简单的方法:调用flatMapWithState
算子
package com.sowhat.flink.state
import java.net.{URL, URLDecoder}
import com.sowhat.flink.source.StationLog
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 第二种方法的实现
* 统计每个手机的呼叫时间间隔,单位是毫秒
*/
object TestKeyedState2 {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取数据源
val filePath: URL = getClass.getResource("/station.log") //使用相对路径来得到完整的文件路径
val packagePath: String = filePath.getPath().replaceAll("%20", ""); //解决路径中含有空格的情况
val str: String = URLDecoder.decode(packagePath, "utf-8"); //解决路径包含中文的情况
val stream: DataStream[StationLog] = streamEnv.readTextFile(str)
.map(line => {
var arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
stream.keyBy(_.callOut) //分组
//有两种情况1、状态中有上一次的通话时间,2、没有。采用scala中的模式匹配
.mapWithState[(String, Long), StationLog] {
case (in: StationLog, None) => ((in.callOut, 0), Some(in)) //状态中没有值 是第一次呼叫
case (in: StationLog, pre: Some[StationLog]) => { //状态中有值,是第二次呼叫
var interval:Long = Math.abs(in.callTime - pre.get.callTime)
((in.callOut, interval), Some(in))
}
}.filter(_._2 != 0)
.print()
streamEnv.execute()
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041
2. CheckPoint
当程序出现问题需要恢复State
数据的时候,只有程序提供支持才可以实现State
的容错。State
的容错需要依靠 CheckPoint
机制,这样才可以保证 Exactly-once
这种语义,但是注意,它只能保证 Flink 系统内的 Exactly-once,比如 Flink 内置支持的算子。针对 Source 和 Sink 组件,如果想要保证 Exactly-once 的话,则这些组件本身应支持这种语义。
1. CheckPoint 原理
Flink 中基于异步
轻量级的分布式快照技术提供了 Checkpoints
容错机制,分布式快照可以将同一时间点 Task/Operator
的状态数据全局统一快照处理,包括前面提到的 Keyed State
和 Operator State
。Flink 会在输入的数据集上间隔性地生成 checkpoint barrier
, 通过栅栏
(barrier)将间隔时间段内的数据划分到相应的 checkpoint 中。如下图:
比如序列偶数求和跟奇数求和:
2. CheckPoint 参数和设置
默认情况下 Flink 不开启
检查点的,用户需要在程序中通过调用方法配置和开启检查点,另外还可以调整其他相关参数:
- Checkpoint 开启和时间间隔指定: 开启检查点并且指定检查点时间间隔为 1000ms,根据实际情况自行选择,如果状态比较大,则建议适当增加该值。
streamEnv.enableCheckpointing(1000)
exactly-ance
和at-least-once
语义选择:
选择 exactly-once 语义保证整个应用内端到端的数据一致性,这种情况比较适合于数据要求比较高,不允许出现丢数据或者数据重复,与此同时,Flink 的性能也相对较弱,而 at-least-once 语义更适合于时廷和吞吐量要求非常高但对数据的一致性要求不高的场景。 如下通过setCheckpointingMode()
方法来设 定语义模式, 默认情况 使用的是 exactly-once 模式。
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACT LY_ONCE);
//或者
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LE AST_ONCE)
123
- Checkpoint 超时时间:
超时时间指定了每次 Checkpoint 执行过程中的上限时间范围,一旦 Checkpoint 执行时 间超过该阈值,Flink 将会中断 Checkpoint 过程,并按照超时处理。该指标可以通过setCheckpointTimeout
方法设定,默认为10
分钟。streamEnv.getCheckpointConfig.setCheckpointTimeout(50000)
- 检查点之间最小时间间隔:
该参数主要目的是设定两个 Checkpoint 之间的最小时间间隔,防止出现例如状态数据过大而导致 Checkpoint 执行时间过长,从而导致 Checkpoint 积压过多,最终 Flink 应用密集地触发 Checkpoint 操作,会占用了大量计算资源而影响到整个应用的性能。
streamEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(600)
1
- 最大并行执行的检查点数量:
通过setMaxConcurrentCheckpoints()
方法设定能够最大同时执行的 Checkpoint 数量。 在默认情况下只有一个检查点可以运行,根据用户指定的数量可以同时触发多个 Checkpoint,进而提升 Checkpoint 整体的效率。
streamEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
1
- 是否删除 Checkpoint 中保存的数据:
设置为RETAIN_ON_CANCELLATION
:表示一旦 Flink 处理程序被 cancel 后,会保留 CheckPoint 数据,以便根据实际需要恢复到指定的 CheckPoint。 设置为DELETE_ON_CANCELLATION
:表示一旦 Flink 处理程序被 cancel 后,会删除 CheckPoint 数据,只有 Job 执行失败的时候才会保存 CheckPoint。
//删除
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckp ointCleanup.DELETE_ON_CANCELLATION)
//保留
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckp ointCleanup.RETAIN_ON_CANCELLATION)
1234
- TolerableCheckpointFailureNumber:
设置可以容忍的检查的失败数,超过这个数量则系统自动关闭和停止任务。
streamEnv.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)
1
3. 保存机制 StateBackend(状态后端)
默认情况下,State 会保存在 TaskManager 的内存
中,CheckPoint
会存储在 JobManager
的内存中。State
和 CheckPoint
的存储位置取决于StateBackend
的配置。Flink 一共提供 了 3 种 StateBackend
。包括基于内存的 MemoryStateBackend
、基于文件系统的FsStateBackend
,以及基于 RockDB
作为存储介质的 RocksDBState-Backend
。
1. MemoryStateBackend
基于内存的状态管理具有非常快速
和高效
的特点,但也具有非常多的限制,最主要的就 是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个 应用的正常运行。同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法 恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免
在生产环境中使用 MemoryStateBackend。
// 设定存储空间为10G
streamEnv.setStateBackend(new MemoryStateBackend(10*1024*1024))
12
2. FsStateBackend
和 MemoryStateBackend
有所不同,FsStateBackend 是基于文件系统
的一种状态管理器, 这里的文件系统可以是本地文件系统,也可以是 HDFS 分布式文件系统。FsStateBackend 更适合任务状态非常大的情况,例如应用中含有时间范围非常长的窗口计算,或 Key/value State 状态数据量非常大的场景。
TaskManager仍然使用内存保存数据,但是进行CheckPoint的时候是将数据保存到FS中。
streamEnv.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/checkpoint/cp1"))
1
3. RocksDBStateBackend
RocksDBStateBackend 是 Flink 中内置的第三方状态管理器,和前面的状态管理器不同,RocksDBStateBackend 需要单独引入相关的依赖包到工程中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.1</version>
</dependency>
12345
RocksDBStateBackend 采用异步
的方式进行状态数据的 Snapshot
,任务中的状态数据首先被写入本地 RockDB 中,这样在 RockDB 仅会存储正在进行计算的热数据,而需要进行 CheckPoint 的时候,会把本地的数据直接复制到远端的 FileSystem 中。
与 FsStateBackend 相比,RocksDBStateBackend 在性能上要比 FsStateBackend 高一些,主要是因为借助于 RocksDB 在本地存储了最新热数据,然后通过异步的方式再同步到文件系 统中,但 RocksDBStateBackend
和 MemoryStateBackend
相比性能就会较弱一些。RocksDB 克服了 State 受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产中使用。
streamEnv.setStateBackend(new RocksDBStateBackend ("hdfs://hadoop101:9000/checkpoint/cp2"))
1
4. 全局配置 StateBackend
以上的代码都是单 job
配置状态后端,也可以全局配置状态后端,需要修改 flink-conf.yaml 配置文件:
state.backend: filesystem
filesystem 表示使用 FsStateBackend,
jobmanager 表示使用 MemoryStateBackend
rocksdb 表示使用 RocksDBStateBackend。
---
flink-conf.yaml 配置文件中
state.checkpoints.dir: hdfs://hadoop101:9000/checkpoints
1234567
默认情况下,如果设置了 CheckPoint 选项,则 Flink 只保留最近成功生成的 1 个 CheckPoint,而当 Flink 程序失败时,可以通过最近的 CheckPoint 来进行恢复。但是,如果希望保留多个CheckPoint,并能够根据实际需要选择其中一个进行恢复,就会更加灵活。 添加如下配置,指定最多可以保存的 CheckPoint 的个数。
state.checkpoints.num-retained: 2
1
4. Checkpoint案例
案例
:设置 HDFS 文件系统的状态后端,取消 Job 之后再次恢复 Job。
使用WordCount案例来测试一下HDFS的状态后端,先运行一段时间Job,然后cancel,在重新启动,看看状态是否是连续的
package com.sowhat.flink.state
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TestCheckPointByHDFS {
//使用WordCount案例来测试一下HDFS的状态后端,先运行一段时间Job,然后cancel,在重新启动,看看状态是否是连续的
def main(args: Array[String]): Unit = {
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//开启CheckPoint并且设置一些参数
streamEnv.enableCheckpointing(5000) //每隔5秒开启一次CheckPoint
streamEnv.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/checkpoint/cp1")) //存放检查点数据
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
streamEnv.getCheckpointConfig.setCheckpointTimeout(5000)
streamEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //终止job保留检查的数据
//修改并行度
streamEnv.setParallelism(1) //默认所有算子的并行度为1
//2、导入隐式转换
import org.apache.flink.streaming.api.scala._
//3、读取数据,读取sock流中的数据
val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101", 8888) //DataStream ==> spark 中Dstream
//4、转换和处理数据
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.map((_, 1)).setParallelism(2)
.keyBy(0) //分组算子 : 0 或者 1 代表下标。前面的DataStream[二元组] , 0代表单词 ,1代表单词出现的次数
.sum(1).setParallelism(2) //聚会累加算子
//5、打印结果
result.print("结果").setParallelism(1)
//6、启动流计算程序
streamEnv.execute("wordcount")
}
}
1234567891011121314151617181920212223242526272829303132333435363738
打包上传到WebUI:
在nc -lk 8888
输入若干单词。然后查找 WebUI 的输出。然后通过WebUI将任务取消。最后尝试将任务重启。
./flink run -d -s hdfs://hadoop101:9000/checkpoint/cp1/精确到跟meta数据同级目录 -c com.sowhat.flink.state.CheckpointOnFsBackend /home/Flink-Demo-1.0-SNAPSHOT.jar
1
也可以通过WebUI 重启,指定 MainClass跟 CheckPoint即可。此处关键在于CheckPoint路径要写对!
5. SavePoint
Savepoints 是检查点的一种特殊实现,底层实现其实也是使用 Checkpoints 的机制。 Savepoints 是用户以手工命令
的方式触发 Checkpoint,并将结果持久化到指定的存储路径 中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。
配置 Savepoints 的存储路径
在 flink-conf.yaml 中配置 SavePoint 存储的位置,设置后,如果要创建指定 Job 的 SavePoint,可以不用在手动执行命令时指定 SavePoint 的位置。
state.savepoints.dir: hdfs:/hadoop101:9000/savepoints
1
在代码中设置算子 ID
为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员 通过手动给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子 指定 ID,则会由 Flink 自动给每个算子生成一个 ID。而这些自动生成的 ID 依赖于程序的结 构,并且对代码的更改是很敏感的。因此,强烈建议用户手动设置 ID。
package com.sowhat.flink.state
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TestSavePoints {
def main(args: Array[String]): Unit = {
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//修改并行度
streamEnv.setParallelism(1) //默认所有算子的并行度为1
//2、导入隐式转换
import org.apache.flink.streaming.api.scala._
//3、读取数据,读取sock流中的数据
val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101",8888) //DataStream ==> spark 中Dstream
.uid("socket001")
//4、转换和处理数据
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")).uid("flatmap001")
.map((_, 1)).setParallelism(2).uid("map001")
.keyBy(0)//分组算子 : 0 或者 1 代表下标。前面的DataStream[二元组] , 0代表单词 ,1代表单词出现的次数
.sum(1).uid("sum001")
//5、打印结果
result.print("结果").setParallelism(1)
//6、启动流计算程序
streamEnv.execute("wordcount")
}
}
1234567891011121314151617181920212223242526
触发 SavePoint
//先启动Job
[root@hadoop101 bin]# ./flink run -c com.bjsxt.flink.state.TestSavepoints -d /home/Flink-Demo-1.0-SNAPSHOT.jar
[root@hadoop101 bin]# ./flink list 获取 job 对应ID
//再取消Job
[root@hadoop101 bin]# ./flink savepoint 6ecb8cfda5a5200016ca6b01260b94ce
// 触发SavePoint
[root@hadoop101 bin]# ./flink cancel 6ecb8cfda5a5200016ca6b01260b94ce
1234567
从 SavePoint 启动 Job
大致方法跟上面的CheckPoint启动Job类似。
6. 总结
若干个常用的状态算子大致如何存储的要了解。
CheckPoint的原理主要是图示,理解如何保证精准一致性的。
CheckPoint一般有基于内存的,基于HDFS的跟基于DB的,整体来说基于DB的把数据存储早DB中跟HDFS中是最好的。
SavePoint是手动触发的CheckPoint,一般方便线上迁移的功能等,并且尽量给每一个算子自定义一个UID,
6. Window 窗口
无界数据变为若干个有界数据
。Windows 计算是流式计算中非常常用的数据计算方式之一,通过按照固定时间或长度将数据流切分成不同的窗口,然后对数据进行相应的聚合运算,从而得到一定时间范围内的统计结果。例如统计最近 5 分钟内某基站的呼叫数,此时基站的数据在不断地产生,但是通过 5 分钟的窗口将数据限定在固定时间范围内,就可以对该范围内的有界数据执行聚合处理, 得出最近 5 分钟的基站的呼叫数量。
1. Window分类
1. Global Window 和 Keyed Window
在运用窗口计算时,Flink根据上游数据集是否为KeyedStream类型,对应的Windows 也 会有所不同。
- Keyed Window: 上游数据集如果是 KeyedStream 类型,则调用 DataStream API 的
window()
方法,数据会根据 Key 在不同的 Task 实例中并行分别计算,最后得出针对每个 Key 统计的结果。 - Global Window:如果是 Non-Keyed 类型,则调用
WindowsAll()
方法,所有的数据都会在窗口算子中由到一个 Task 中计算,并得到全局统计结果。
//读取文件数据
val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
.map(line=>{
var arr =line.split(",") new
StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.to Long)
})
//Global Window
data.windowAll(自定义的WindowAssigner)
//Keyed Window
data.keyBy(_.sid).window(自定义的WindowAssigner)
12345678910
2. Time Window 和 Count Window
基于业务数据的方面考虑,Flink 又支持两种类型的窗口,一种是基于时间的窗口叫Time Window
。还有一种基于输入数据数量的窗口叫 Count Window
3. Time Window(时间窗口)
根据不同的业务场景,Time Window 也可以分为三种类型,分别是滚动窗口
(Tumbling Window)、滑动窗口
(Sliding Window)和会话窗口
(Session Window)
- 滚动窗口(Tumbling Window)
滚动窗口是根据固定时间进行切分,且窗口和窗口之间的元素互不重叠
。这种类型的窗 口的最大特点是比较简单。只需要指定一个窗口长度(window size)。
//每隔5秒统计每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
//.window(TumblingEventTimeWindows.of(Time.seconds(5))) 跟上面同样功能
.sum(1) //聚合
123456
其中时间间隔可以是 Time.milliseconds(x)、Time.seconds(x)或 Time.minutes(x)。
- 滑动窗口(Sliding Window)
滑动窗口也是一种比较常见的窗口类型,其特点是在滚动窗口基础之上增加了窗口滑动时间(Slide Time),且允许窗口数据发生重叠。当 Windows size 固定之后,窗口并不像 滚动窗口按照 Windows Size 向前移动,而是根据设定的 Slide Time 向前滑动。窗口之间的 数据重叠大小根据 Windows size 和 Slide time 决定,当 Slide time 小于 Windows size 便会发生窗口重叠,Slide size 大于 Windows size 就会出现窗口不连续,数据可能不能在 任何一个窗口内计算,Slide size 和 Windows size 相等时,Sliding Windows 其实就是 Tumbling Windows。
//每隔3秒计算最近5秒内,每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.timeWindow(Time.seconds(5),Time.seconds(3)) //.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(3)))
.sum(1)
12345
- 会话窗口(Session Window)
会话窗口(Session Windows)主要是将某段时间内活跃度较高的数据聚合成一个窗口 进行计算,窗口的触发的条件是Session Gap
,是指在规定的时间内如果没有数据活跃接入
, 则认为窗口结束,然后触发窗口计算结果。需要注意的是如果数据一直不间断地进入窗口, 也会导致窗口始终不触发的情况。与滑动窗口、滚动窗口不同的是,Session Windows 不需 要有固定 windows size 和 slide time,只需要定义 session gap,来规定不活跃数据的时 间上限即可。
//3秒内如果没有数据进入,则计算每个基站的日志数量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1).window(EventTimeSessionWindows.withGap(Time.seconds(3))).sum(1)
123
4. Count Window(数量窗口)
Count Window 也有滚动窗口、滑动窗口等。由于使用比较少TODO,比如五条数据算一批次这样的统计。
2. Window的API
在以后的实际案例中 Keyed Window
使用最多,所以我们需要掌握 Keyed Window 的算子, 在每个窗口算子中包含了 Windows Assigner、Windows Trigger(窗口触发器)、Evictor (数据剔除器)、Lateness(时延设定)、Output Tag(输出标签)以及 Windows Funciton 等组成部分,其中 Windows Assigner 和 Windows Funciton 是所有窗口算子必须指定
的属性, 其余的属性都是根据实际情况选择指定。
stream.keyBy(...) // 是Keyed类型数据集
.window(...) //指定窗口分配器类型
[.trigger(...)] //指定触发器类型(可选)
[.evictor(...)] //指定evictor或者不指定(可选)
[.allowedLateness(...)] //指定是否延迟处理数据(可选)
[.sideOutputLateData(...)] //指定Output Lag(可选)
.reduce/aggregate/fold/apply() //指定窗口计算函数
[.getSideOutput(...)] //根据Tag输出数据(可选)
12345678
- Windows Assigner: 指定窗口的类型,定义如何将数据流分配到一个或多个窗口。
- Windows Trigger: 指定窗口触发的时机,定义窗口满足什么样的条件触发计算。
- Evictor: 用于数据剔除。
- allowedLateness: 标记是否处理迟到数据,当迟到数据到达窗口中是否触发计算。
- Output Tag: 标记输出标签,然后在通过 getSideOutput 将窗口中的数据根据标签输出。
- Windows Funciton: 定义窗口上数据处理的逻辑,例如对数据进行 sum 操作。
3. 窗口聚合函数
如果定义了 Window Assigner 之后,下一步就可以定义窗口内数据的计算逻辑,这也就是 Window Function 的定义。Flink 中提供了四种类型的 Window Function,分别为 ReduceFunction
、AggregateFunction
以及 ProcessWindowFunction
,(sum 和 max)
等。 前三种类型的 Window Fucntion 按照计算原理的不同可以分为两大类:
- 一类是增量聚合函数:对应有
ReduceFunction
、AggregateFunction
; - 另一类是全量窗口函数,对应有
ProcessWindowFunction
(还有WindowFunction
)。
增量聚合函数计算性能较高,占用存储空间少,主要因为基于中间状态的计算结果,窗口中只维护中间结果状态值,不需要缓存原始数据。而全量窗口函数使用的代价相对较高, 性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口触发的时候,对所有的原始数据进行汇总计算。
1. ReduceFunction
Reduce要求输入跟输出类型要一样!这点切记。需求
:每隔5秒统计每个基站的日志数量
object TestReduceFunctionByWindow {
//每隔5秒统计每个基站的日志数量
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取数据源
val stream: DataStream[StationLog] = streamEnv.socketTextStream("hadoop101", 8888)
.map(line => {
val arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
//开窗
stream.map(log => ((log.sid, 1)))
.keyBy(_._1)
.timeWindow(Time.seconds(5)) //开窗
.reduce((t1, t2) => (t1._1, t1._2 + t2._2))
.print()
streamEnv.execute()
}
}
123456789101112131415161718192021222324
2. AggregateFunction
和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算 函数,但 AggregateFunction 在窗口计算上更加通用。AggregateFunction 接口相对 ReduceFunction 更加灵活,输入跟输出类型不要求完全一致,实现复杂度也相对较高。AggregateFunction 接口中定义了三个 需要复写的方法,其中 add()定义数据的添加逻辑,getResult 定义了根据 accumulator 计 算结果的逻辑,merge 方法定义合并 accumulator 的逻辑。初始化,分区内如何处理,分区间如何处理,最终如何输出。
需求
:每隔3秒计算最近5秒内,每个基站的日志数量
object TestAggregatFunctionByWindow {
//每隔3秒计算最近5秒内,每个基站的日志数量
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取数据源
val stream: DataStream[StationLog] = streamEnv.socketTextStream("hadoop101", 8888)
.map(line => {
val arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
//开窗
val value: DataStream[(String, Long)] = stream.map(log => ((log.sid, 1)))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3))) //开窗,滑动窗口
.aggregate(new MyAggregateFunction, new MyWindowFunction) // 到底是数字对应哪个基站
// aggregate(增量函数,全量函数)
value.print()
streamEnv.execute()
}
/**
* 里面的add方法,是来一条数据执行一次,getResult在窗口结束的时候执行一次
* in,累加器acc,out
* https://blog.csdn.net/chilimei8516/article/details/100796930
*/
class MyAggregateFunction extends AggregateFunction[(String, Int), Long, Long] {
override def createAccumulator(): Long = 0 //初始化一个累加器 acc,开始的时候为0
// 分区内操作
override def add(value: (String, Int), accumulator: Long): Long = accumulator + value._2
// 结果返回
override def getResult(accumulator: Long): Long = accumulator
// 分区间操作
override def merge(a: Long, b: Long): Long = a + b
}
// WindowFunction 输入数据来自于AggregateFunction ,
// 在窗口结束的时候先执行AggregateFunction对象的getResult,然后再执行apply
// in,out,key,window
class MyWindowFunction extends WindowFunction[Long, (String, Long), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = {
out.collect((key, input.iterator.next())) //next得到第一个值,迭代器中只有一个值
}
}
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
3. ProcessWindowFunction
前面提到的ReduceFunction
和 AggregateFunction
都是基于中间状态实现增量计算的 窗口函数,虽然已经满足绝大多数场景,但在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,这时就需要使用到 ProcessWindowsFunction
,ProcessWindowsFunction
能够更加灵活地支持基于窗口全部数据元素的结果计算 , 例如对整个窗口 数 据排序取TopN, 这样的需要就必须使用ProcessWindowFunction
。
需求
:每隔5秒统计每个基站的日志数量
object TestProcessWindowFunctionByWindow {
//每隔5秒统计每个基站的日志数量
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
streamEnv.setParallelism(1)
//读取数据源
val stream: DataStream[StationLog] = streamEnv.socketTextStream("hadoop101", 8888)
.map(line => {
var arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
//开窗
stream.map(log => ((log.sid, 1)))
.keyBy(_._1) // .timeWindow(Time.seconds(5))//开窗
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] {
//一个窗口结束的时候调用一次(一个分组执行一次) in,out,key,windows
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
println("------------")
//注意:整个窗口的数据保存到Iterable,里面有很多行数据。Iterable的size就是日志的总条数
out.collect((key, elements.size))
}
}).print()
streamEnv.execute()
}
}
1234567891011121314151617181920212223242526272829
需求
:窗口函数读数据然后将数据写入到neo4j,感觉其实应该用 自定的Sink 更合适一些。
object DealDataFromKafka {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "IP1:9092,IP2:9092")
properties.setProperty("group.id", "timer")
// 从最新数据开始读
properties.setProperty("auto.offset.reset", "latest")
//val dataStream: DataStream[String] = environment.addSource(new FlinkKafkaConsumer011[String]("sowhat", new SimpleStringSchema(), properties))
val dataStream: DataStream[String] = environment.socketTextStream("IP", 8889)
val winData: AllWindowedStream[String, TimeWindow] = dataStream.timeWindowAll(Time.seconds(4))
var pre: Int = 0
var tmp: Int = 0
val timeWithHashCode: DataStream[(Int, String)] = winData.process(new ProcessAllWindowFunction[String, (Int, String), TimeWindow]() {
override def process(context: Context, elements: Iterable[String], out: Collector[(Int, String)]): Unit = {
val driver: Driver = GraphDatabase.driver("bolt://IP:9314", AuthTokens.basic("neo4j", "neo4j0fcredithc"))
val session: Session = driver.session()
elements.foreach(value => {
tmp += 1
var now: Int = value.hashCode
now = tmp
session.run(s"CREATE (a:Test {id:${now}, time:'${value}'})")
if (pre != 0) {
session.run(s"MATCH (begin:Test{id:${pre}}) ,(end:Test{id:${now}}) MERGE (begin)-[like:Time_Link]->(end)")
}
out.collect((tmp, s" MATCH (begin:Test{id:${pre}}) ,(end:Test{id:${now}}) MERGE (begin)-[like:Time_Link]->(end)"))
pre = now
}
)
// session.close()
// driver.close()
}
})
timeWithHashCode.print("HashCode With time:")
environment.execute("getData")
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041
End
窗口的分类从不同的维度来说,
- 上游是否为KeyedStream,不同数据集调用不同方法。
- 根据上游数据是时间窗口(滚动窗口、滑动窗口、会话窗口)还是数据量窗口。
- 窗口若干API调用方法,窗口的聚合函数(reduceFunction、AggregateFunction、ProcessWindowFunction、WindowFunction)。