免费咨询热线
13621929115Flink概念模型1. 核心概念Streams(流),流分为有界流和无界流有界流指的是有固定大小,不随时间增加而增长的数据,比如我们保存在 Hive 中的一个表;而无界流指的是数据随着时间增加而增长,计算状态持续进行,比如我们消费 Kafka 中的消息,消息持续不断,那么计算也会持续进行不会结束。
S展示沙盘tate(状态),所谓的状态指的是在进行流式计算过程中的信息一般用作容错恢复和持久化,流式计算在本质上是增量计算,也就是说需要不断地查询过去的状态状态在 Flink 中有十分重要的作用,例如为了确保 Exactly-once 语义需要将数据写到状态中;此外,状态的持久化存储也是集群出现 Fail-o展示沙盘ver 的情况下自动重启的前提条件。
Time(时间),Flink 支持了 Event time、Ingestion time、Processing time 等多种时间语义,时间是我们在进行 Flink 程序开发时判断业务状态是否滞后和延迟的重要依据。
API:Flink 自身提供了不同级别的抽象来支展示沙盘持我们开发流式或者批量处理程序,由上而下可分为 SQL / Table API、DataStream API、ProcessFunction 三层,开发者可以根据需要选择不同层级的 API 进行开发。
2. 编程模型Flink 程序的基础构建模块是流(Streams)和转换(Transformatio展示沙盘ns),每一个数据流起始于一个或多个 Source,并终止于一个或多个 Sink数据流类似于有向无环图。
(DAG)3. Flink 集群模型和角色在实际生产中,Flink 都是以集群在运行,在运行的过程中包含了两类进程JobManager:它扮演的是集群管理者的角色,负责调度任务、协调 checkp展示沙盘oints、协调故障恢复、收集 Job 的状态信息,并管理 Flink 集群中的从节点 TaskManager。
TaskManager:实际负责执行计算的 Worker,在其上执行 Flink Job 的一组 Task;TaskManager 还是所在节点的管理员,它负责把该节点上的服务器信息比如内展示沙盘存、磁盘、任务运行情况等向 JobManager 汇报。
Client:用户在提交编写好的 Flink 工程时,会先创建一个客户端再进行提交,这个客户端就是 Client,Client 会根据用户传入的参数选择使用 yarn per job 模式、stand-alone 模式还是 yarn-sessi展示沙盘on 模式将 Flink 程序提交到集群。
4. Flink 资源和资源组Flink 集群中,一个 TaskManger 就是一个 JVM 进程,并且会用独立的线程来执行 task,为了控制一个 TaskManger 能接受多少个 task,可以简单的把 Task Slot 理解为 TaskManag展示沙盘er 的计算资源子集。
假如一个 TaskManager 拥有 5 个 slot,那么该 TaskManager 的计算资源会被平均分为 5 份,不同的 task 在不同的 slot 中执行,避免资源竞争但是需要注意的是,slot 仅仅用来做内存的隔离,对 CPU 不起作用。
那么运行在同一个 JVM 展示沙盘的 task 可以共享 TCP 连接,减少网络传输,在一定程度上提高了程序的运行效率,降低了资源消耗与此同时,Flink 还允许将不能形成算子链的两个操作,比如下图中的 flatmap 和 key&sink 放在一个 TaskSlot 里执行以达到资源共享的目的。
5. 架构&Flink四层图结构Fl展示沙盘ink 也采用了经典的主从模式,DataFlow Graph 与 Storm 形成的拓扑 Topology 结构类似,Flink 程序启动后,会根据用户的代码处理成 Stream Graph,然后优化成为 JobGraph,JobManager 会根据 JobGraph 生成 ExecutionGr展示沙盘aph。
ExecutionGraph 才是 Flink 真正能执行的数据结构,当很多个 ExecutionGraph 分布在集群中,就会形成一张网状的拓扑结构Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。
l 展示沙盘StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图用来表示程序的拓扑结构l JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。
主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这展示沙盘样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗l ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。
ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构l 物理执行图:JobManager 根展示沙盘据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图l StreamNode:用来代表 operator 的类,并具有所展示沙盘有相关的属性,如并发度、入边和出边等l StreamEdge:表示连接两个StreamNode的边。
JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构l JobVertex:经过优化后符合条件的多个StreamNode可能会chain展示沙盘在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。
l IntermediateDataSet:表示JobVertex的输出,即经过operator处理产生的数据集produ展示沙盘cer是JobVertex,consumer是JobEdgel JobEdge:代表了job graph中的一条数据传输通道。
source 是 IntermediateDataSet,target 是 JobVertex即数据通过JobEdge由IntermediateDataSet传递给目标Job展示沙盘VertexExecutionGraph
:JobManager 根据 JobGraph 生成ExecutionGraphExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构l ExecutionJobVertex:和JobGraph中的JobVertex一一对应。
每一展示沙盘个ExecutionJobVertex都有和并发度一样多的 ExecutionVertexl ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。
l Interm展示沙盘ediateResult:和JobGraph中的IntermediateDataSet一一对应一个IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。
l IntermediateResultPartition:表示展示沙盘ExecutionVertex的一个输出分区,producer是ExecutionVertex,consumer是若干个ExecutionEdge。
l ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,targ展示沙盘et是ExecutionVertexsource和target都只能是一个。
l Execution:是执行一个 ExecutionVertex 的一次尝试当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID一个 Execution 通展示沙盘过 ExecutionAttemptID 来唯一标识。
JM和TM之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskM展示沙盘anager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
l Task:Execution被调度后在分配的 TaskManager 中启动对应的 TaskTask 包裹了具有用户执行逻辑的 operatorl ResultPartition:代表由一个Task的生成的数据,和Exec展示沙盘utionGraph中的IntermediateResultPartition一一对应。
l ResultSubpartition:是ResultPartition的一个子分区每个ResultPartition包含多个ResultSubpartition,其数目要由下游消费 Task 数和 Distr展示沙盘ibutionPattern 来决定。
l InputGate:代表Task的输入封装,和JobGraph中JobEdge一一对应每个InputGate消费了一个或多个的ResultPartitionl InputChannel:每个InputGate会包含一个以上的InputChannel,和Exe展示沙盘cutionGraph中的ExecutionEdge一一对应,也和ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。
6. 反压(BackPressure)反压是分布式处理系统中经常遇到的问题,当消费者速度低于生产者的速展示沙盘度时,则需要消费者将信息反馈给生产者使得生产者的速度能和消费者的速度进行匹配7. 时间的种类针对stream数据中的时间,可以分为以下三种:。
Ø Event Time:事件产生的时间,它通常由事件中的时间戳描述Ø Ingestion time:事件(日志,数据,消息)进入Flink的时间(不考虑)Ø展示沙盘 Processing Time:事件被处理时当前系统的时间。
8. 容错Flink 基于两阶段提交实现了精确的一次处理语义Storm 在容错方面只支持了 Record 级别的 ACK-FAIL,发送出去的每一条消息,都可以确定是被成功处理或失败处理,因此 Storm 支持至少处理一次语义。
9. 分布展示沙盘式缓存分布式缓存最初的思想诞生于 Hadoop 框架,Hadoop 会将一些数据或者文件缓存在 HDFS 上,在分布式环境中让所有的计算节点调用同一个配置文件在 Flink 中,Flink 框架开发者们同样将这个特性进行了实现。
Flink 提供的分布式缓存类型 Hadoop,目的是为了在分布式环境中展示沙盘让每一个 TaskManager 节点保存一份相同的数据或者文件,当前计算节点的 task 就像读取本地文件一样拉取这些配置如何使用分布式缓存,使用分布式缓存有两个步骤。
第一步:首先需要在 env 环境中注册一个文件,该文件可以来源于本地,也可以来源于 HDFS ,并且为该文件取一个名字第二步:在使展示沙盘用分布式缓存时,可根据注册的名字直接获取10. 状态所谓的状态,其实指的是 Flink 程序的中间计算结果。
Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器When working with state, it might also be useful to re展示沙盘ad about Flink’s state backends. Flink provides different state backends that specify how and where state is stored. State can be located on Java’s he展示沙盘ap or off-heap. Depending on your state backend, Flink can also manage the state for the application, meaning Flink deals with the memory management (展示沙盘possibly spilling to disk if necessary) to allow applications to hold very large state. State backends can be configured without changing your applica展示沙盘tion logic.
Flink 的官网同样给出了适用于状态计算的几种情况:l When an application searches for certain event patterns, the state will store the sequence of events encounter展示沙盘ed so far
l When aggregating events per minute/hour/day, the state holds the pending aggregatesl When training a machine learning model over a stream o展示沙盘f data points, the state holds the current version of the model parameters
l When historic data needs to be managed, the state allows efficient access 展示沙盘to events that occurred in the past
以上四种情况分别是:复杂事件处理获取符合某一特定时间规则的事件、聚合计算、机器学习的模型训练、使用历史的数据进行计算10.1 状态的类型Flink中有两种基本类型的State:Keyed State,Operator State,展示沙盘他们两种都可以以两种形式存在:原始状态(raw state)和托管状态(managed state)。
托管状态:由Flink框架管理的状态,我们通常使用的就是这种原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知通展示沙盘常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。
但是我们工作中一般不常用,所以我们不考虑他Operator Statel operator state是task级别的state,说白了就是每个task对应一个statel Kafka C展示沙盘onnector source中的每个分区(task)都需要记录消费的topic的partition和offset等信息。
l operator state 只有一种托管状态:ValueStateKeyed Statel keyed state 记录的是每个key的状态l Keyed state托管状展示沙盘态有六种类型:ValueState
ListStateMapStateReducingStateAggregatingStateFoldingState10.2 状态后端种类和配置默认情况下,Flink 的状态会保存在 taskmanager 的内存中,Flink 提供了三种可用的状态后端用于在不同情展示沙盘况下进行状态后端的保存。
MemoryStateBackendFsStateBackendRocksDBStateBackendMemoryStateBackend默认情况下,状态信息是存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到JobManager 的堆内存展示沙盘中。
MemoryStateBackend 适用于我们本地调试使用,来记录一些状态很小的 Job 状态信息每个独立的状态(state)默认限制大小为 5MB,可以通过构造函数增加容量状态的大小不能超过 akka 的 Framesize 大小
聚合后的状态必须能够放进 JobManager 的内存中缺点:展示沙盘只能保存数据量小的状态状态数据有可能会丢失优点:开发测试很方便FsStateBackend状态信息存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到指定的文件中 (HDFS等文件系统)
缺点:状态大小受TaskManager内存限制(默认支持5M)优点:状态访问速度展示沙盘很快状态信息不会丢失用于: 生产,也可存储状态数据量大的情况RocksDBStateBackend状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中
checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)与 FsStateBa展示沙盘ckend 不同的是,RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 运行节点的数据目录下。
这意味着,RocksDBStateBackend 可以存储远超过 FsStateBackend 展示沙盘的状态,可以避免向 FsStateBackend 那样一旦出现状态暴增会导致 OOM,但是因为将状态数据保存在 RocksDB 数据库中,吞吐量会有所下降。
缺点:状态访问速度有所下降优点:可以存储超大量的状态信息状态信息不会丢失用于:也适用于大作业、状态较大、全局高可用的那些任务10.3 State展示沙盘backend配置方式(1)单任务调整修改当前任务代码env.setStateBackend(new
FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend展示沙盘()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
(2)全局调整修改flink-conf.yamlstate.backend: filesystem
state.checkpoints.dir: hd展示沙盘fs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend),
filesystem(FsStateBackend), rocksdb(Roc展示沙盘ksDBStateBackend)
11. checkpoint 机制11.1 checkpoint概述(1)为了保证state的容错性,Flink需要对state进行checkpoint(2)Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个。
Op展示沙盘erator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常(3)Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:
持久化的s展示沙盘ource,它需要支持在一定时间内重放事件这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)。
11.2 checkpoint配置默认展示沙盘checkpoint功能是disabled的,想要使用的时候需要先启用,checkpoint开启之后,checkPointMode有两种,Exactly-once和At-least-once,默认的checkPointMode是Exactly-once,
Exactly-once对于大多数应用来说是最展示沙盘合适的At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)默认checkpoint功能是disabled的,想要使用的时候需要先启用
StreamExecutionEnvironment env =
StreamExecutionEnviron展示沙盘ment.getExecutionEnvironment();
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式展示沙盘为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoin展示沙盘t最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointC展示沙盘onfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处展示沙盘理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCl展示沙盘eanup.RETAIN_ON_CANCELLATION);。
12. 故障恢复和重启策略12.1 故障恢复Flink 支持了不同级别的故障恢复策略,jobmanager.execution.failover-strategy 的可配置项有两种:full 和 region。
Full: 集群中的 Tas展示沙盘k 发生故障,那么该任务的所有 Task 都会发生重启Region: Flink 会把我们的任务分成不同的 Region,当某一个 Task 发生故障时,Flink 会计算需要故障恢复的最小 Region。
Flink 在判断需要重启的 Region 时,采用了以下的判断逻辑:发生错误的 Task 所展示沙盘在的 Region 需要重启;如果当前 Region 的依赖数据出现损坏或者部分丢失,那么生产数据的 Region 也需要重启;
为了保证数据一致性,当前 Region 的下游 Region 也需要重启12.2 重启策略Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一展示沙盘个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。
如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略,默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定配置参数restart-strategy 定义了哪个策略被使用。
常用的重启策略(1)固定间隔展示沙盘 (Fixed delay)(2)失败率 (Failure rate)(3)无重启 (No restart)如果没有启用 checkpointing,则使用无重启 (no restart) 策略。
如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 展示沙盘策略, 尝试重启次数默认值是:Integer.MAX_VALUE,重启策略可以在flink-conf.yaml中配置,表示全局的配置。
也可以在应用代码中动态指定,会覆盖全局配置12.3 重启策略配置固定间隔 (Fixed delay)第一种:全局配置 flink-conf.yamlrestart-s展示沙盘trategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
第二种:应用代码设置env.setRestartStrategy展示沙盘(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
));
失败率 (Failure rate)第一种:全局配置 flink-co展示沙盘nf.yamlrestart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure展示沙盘-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
第二种:应用代码设置env.setRestartStrategy(RestartStrategies.failureRateRestart(
展示沙盘3, // 一个时间段内的最大失败次数
Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
Time.of(10, TimeUnit.SECONDS) // 间隔
));
无重启第一种:全局配置 flink-conf展示沙盘.yamlrestart-strategy: none第二种:应用代码设置env.setRestartStrategy(RestartStrategies.noRestart());
12.4 多checkpoint默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个C展示沙盘heckpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。
但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前Flink可以支持保留多个展示沙盘Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:。
state.checkpoints.num-retained: 2013. Window13.1 Window概述聚合事件(比如计数、求和)在流上的展示沙盘工作方式与批处理不同比如,对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)。
所以,流上的聚合需要由 window 来划定范围,比如 “计算过去的5分钟” ,或者 “最后100个元素的和” window是一种可以把无限数据切割为有限数据块的手段窗口可以是 时间驱动的 【Time Win展示沙盘dow】(比如:每30秒)或者 数据驱动的【Count Window】。
(比如:每100个元素)13.2 Window类型聚合事件(比如计数、求和)在流上的工作方式与批处理不同比如,对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)所以,流上的聚合需要由 window 来划定范围,比如展示沙盘 “计算过去的5分钟” ,或者 “最后100个元素的和” 。
window是一种可以把无限数据切割为有限数据块的手段窗口可以是 时间驱动的 【Time Window】(比如:每30秒)或者 数据驱动的【Count Window】(比如:每100个元素)窗口通常被区分为不同的类型:
tumbling wi展示沙盘ndows:滚动窗口 【没有重叠】 (time,count)sliding windows:滑动窗口 【有重叠】(time,count)session windows:会话窗口 (time)
global windows: 没有窗口Tumblingwindows:滚动窗口【没有重叠】sliding w展示沙盘indowSession windowGlobal Window13.3 Window 操作
Keyed Window & Non-Keyed Window14. CEP复杂事件处理Complex Event Processing(CEP)是 Flink 提供的一个非常亮眼的功能,CEP, is ev展示沙盘ent processing that combines data from multiple sources to infer events or patterns that suggest more complicated circumstances. The goal of complex e展示沙盘vent processing is to identify meaningful events (such as opportunities or threats) and respond to them as quickly as possible.
在我们的实际生产中,随着数据的实时性要求越来越展示沙盘高,实时数据的量也在不断膨胀,在某些业务场景中需要根据连续的实时数据,发现其中有价值的那些事件14.1 程序结构Flink CEP 的程序结构主要分为两个步骤:。
定义模式匹配结果第一步,定义一个模式 Pattern,在这里定义了一个这样的模式,即在所有接收到的事件中匹配那些以 id 等于 42 的事展示沙盘件,然后匹配 volume 大于 10.0 的事件,继续匹配一个 name 等于 end 的事件;
第二步,匹配模式并且发出报警,根据定义的 pattern 在输入流上进行匹配,一旦命中我们的模式,就发出一个报警14.2 模式定义Flink 支持了非常丰富的模式定义,这些模式也是我们实现复杂业务逻辑的展示沙盘基础。
Copyright © 2002-2020 上海润之模型设计有限公司 版权所有 展示模型,展品模型,展厅模型,展示道具,展厅展品,展品道具,模型定制,模型公司,上海模型公司 备案号:沪ICP备20018260号