当前位置:首页 » 服务存储 » flink拿什么存储数据
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

flink拿什么存储数据

发布时间: 2022-11-21 19:55:34

‘壹’ 【Flink 精选】阐述 Flink 的容错机制,剖析 Checkpoint 实现流程

Flink 容错机制主要是 状态的保存和恢复,涉及 state backends 状态后端、checkpoint 和 savepoint,还有 Job 和 Task 的错误恢复

Flink 状态后端是指 保存 Checkpoint 数据的容器 ,其分类有 MemoryStateBackend、FsStateBackend、RocksDBStateBackend ,状态的分类有 operator state 和 keyed state

Flink 状态保存和恢复主要依靠 Checkpoint 机制和 Savepoint 机制,两者的区别如下表所示。

快照的概念来源于相片,指照相馆的一种冲洗过程短的照片。在计算机领域, 快照是数据存储的某一时刻的状态记录 Flink Snapshot 快照是指作业状态的全局一致记录 。一个完整的快照是包括 source 算子的状态(例如,消费 kafka partition 的 offset)、状态算子的缓存数据和 sink 算子的状态(批量缓存数据、事务数据等)。

Checkpoint 检查点可以自动产生快照,用于Flink 故障恢复 。Checkpoint 具有分布式、异步、增量的特点。

Savepoint 保存点是用户手动触发的,保存全量的作业状态数据 。一般使用场景是作业的升级、作业的并发度缩放、迁移集群等。

Flink 是采用轻量级的分布式异步快照,其实现是采用栅栏 barrier 作为 checkpoint 的传递信号,与业务数据一样无差别地传递下去 ,目的是使得数据流被切分成微批,进行 checkpoint 保存为 snapshot。当 barrier 经过流图节点的时候,Flink 进行 checkpoint 保存状态数据。
如下图所示,checkpoint n 包含每个算子的状态,该状态是指checkpoint n 之前的全部事件,而不包含它之后的所有事件。

针对用户作业出现故障而导致结果丢失或者重复的问题,Flink 提供3种语义:
At-Least-Once 最少一次 :不会丢失数据,但可能会有重复结果。
Exactly-Once 精确一次 :checkpoint barrier 对齐机制可以保障精确一次。

FailureRateRestartStrategy :允许在指定时间间隔内的最大失败次数,同时可以设置重启延时时间。
FixedDelayRestartStrategy :允许指定的失败次数,同时可以设置重启延时时间。
NoRestartStrategy :不需要重启,即 Job 直接失败。
ThrowingRestartStrategy :不需要重启,直接抛异常。
Job Restart 策略可以通过 env 设置。

上述策略的父类接口是RestartStrategy,其关键是restart(重启操作)。

RestartAllStrategy :重启全部 task,默认策略。
RestartIndivialStrategy :恢复单个 task。如果该 task 没有source,可能导致数据丢失。
NoOpFailoverStrategy :不恢复 task。
上述策略的父类接口是FailoverStrategy,其关键是Factory的create(创建 strategy)、onTaskFailure(处理错误)。

如何产生可靠的全局一致性快照是分布式系统的难点,其传统方案是使用的全局时钟,但存在单点故障、数据不一致等可靠性问题 。为了解决该问题, Chandy-Lamport 算法采用 marker 的传播来代替全局时钟

① 进程 Pi 记录自己的进程状态,同时生产一个标识信息 marker(与正常 message 不同),通过 ouput channel 发送给系统里面的其他进程。
② 进程 Pi 开始记录所有 input channel 接收到的 message

进程 Pj 从 input channel Ckj 接收到 marker。如果 Pj 还没有记录自己的进程状态,则 Pj 记录自己的进程状态,向 output channel 发送 marker;否则 Pj 正在记录自己的进程状态(该 marker 之前的 message)。

所有的进程都收到 marker 信息并且记录下自己的状态和 channel 的状态(包含的 message)。

Flink 的分布式异步快照实现了Chandy Lamport 算法,其核心思想是 在 source 插入 barrier 代替 Chandy-Lamport 算法中的 marker,通过控制 barrier 的同步来实现 snapshot 的备份和 Exactly-Once 语义

Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint。

source task向下游广播barrier。

当source task备份完自己的状态后,会将备份数据的地址(state handle)通知 Checkpoint Coordinator。

map和sink task收集齐上游source的barrier n,执行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB会全量保存到磁盘上(红色大三角表示),然后Flink会从中选择没有上传的文件进行持久化备份(紫色小三角)。

map和sink task在完成Checkpoint 之后,将状态地址state handle返回通知 Coordinator。

当Checkpoint Coordinator收到全部task的state handle,就确定该Checkpoint已完成,并向持久化存储中备份一个Checkpoint Meta(元数据,包括该checkpoint状态数据的备份地址)。

‘贰’ flink配置和内存

1.Hosts and Ports
metrics.internal.query-service.port "0" String Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both.
rest.bind-address (none) String
rest.bind-port "8081" String
taskmanager.data.port 0 Integer 任务管理器的外部端口,用于数据交换操作。
taskmanager.host (none) String
taskmanager.rpc.port "0" String
2.Fault Tolerance
restart-strategy 重启策略 默认空
restart-strategy.fixed-delay.attempts 固定周期重启
restart-strategy.fixed-delay.delay 固定周期重启
restart-strategy.failure-rate.delay 失败率重启
restart-strategy.failure-rate.failure-rate-interval 失败率重启
restart-strategy.failure-rate.max-failures-per-interval 失败率重启

state.backend.incremental 增量checkpoint(仅对rocksdb支持)
state.backend.local-recovery 状态后端配置本地恢复,默认false,只支持键控状态后端
state.checkpoints.num-retained 保留的最大已完成检查点数,默认1
taskmanager.state.local.root-dirs 本地恢复的根目录
3.High Availability
high-availability 默认为NONE To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.
high-availability.cluster-id 高可用flink集群ID
high-availability.storageDir 元数据路径

high-availability.zookeeper.path.root 配置ZK路径
high-availability.zookeeper.quorum 配置ZK集群
4.Memory Configuration
在大多数情况下,用户只需要设置值taskmanager.memory.process.size或taskmanager.memory.flink.size(取决于设置方式),并可能通过调整JVM堆与托管内存的比率taskmanager.memory.managed.fraction。
jobmanager.memory.enable-jvm-direct-memory-limit 是否启用jm进程的JVM直接内存限制,默认false
jobmanager.memory.flink.size 默认none。这包括JobManager消耗的所有内存。非容器配置
jobmanager.memory.heap.size 默认none。默认为总内存减去JVM,network,managed等
jobmanager.memory.jvm-metaspace.size jm的元空间大小,默认256mb
jobmanager.memory.jvm-overhead.fraction jm保留总进程内存的比例,默认0.1
jobmanager.memory.jvm-overhead.max 最大JVM开销,默认1gb
jobmanager.memory.jvm-overhead.min 最小JVM开销,默认192mb
jobmanager.memory.off-heap.size jm的堆外内存,默认128mb,如果第一个参数被启用,这个将生效
jobmanager.memory.process.size JobManager的总进程内存大小。容器化配置这个,为容器总大小
taskmanager.memory.flink.size TaskExecutor的总Flink内存大小。默认none,非容器配置
taskmanager.memory.framework.heap.size TaskExecutor的框架堆内存大小。默认128mb
taskmanager.memory.framework.off-heap.size TaskExecutor的框架堆外内存大小。默认128mb
taskmanager.memory.jvm-metaspace.size TaskExecutor的JVM元空间大小。默认256mb
taskmanager.memory.jvm-overhead.fraction 要为JVM开销保留的总进程内存的分数。默认0.1
taskmanager.memory.jvm-overhead.max TaskExecutor的最大JVM开销最大大小,默认1gb
taskmanager.memory.jvm-overhead.min TaskExecutor的最小JVM开销大小。默认192mb
taskmanager.memory.managed.consumer-weights 消费者权重。DATAPROC(用于流式RocksDB状态后端和批量内置算法)和PYTHON(用于Python进程),默认DATAPROC:70,PYTHON:30
taskmanager.memory.managed.fraction 如果未显式指定托管内存大小,则将用作托管内存的Flink总内存的分数,默认0.4
taskmanager.memory.managed.size TaskExecutor的托管内存大小。默认none
taskmanager.memory.network.fraction 用作网络内存的总Flink内存的分数,默认0.1
taskmanager.memory.network.max TaskExecutor的最大网络内存大小。默认1gb
taskmanager.memory.network.min TaskExecutor的最小网络内存大小。默认64mb
taskmanager.memory.process.size TaskExecutor的总进程内存大小。默认none,容器配置
taskmanager.memory.task.heap.size tm内存,默认none,
taskmanager.memory.task.off-heap.size tm堆外内存,默认0
5.Miscellaneous Options
fs.allowed-fallback-filesystems none
fs.default-scheme none
io.tmp.dirs 'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html

‘叁’ [Flink State] State究竟保存在哪里

从源码解析State的保存过程 ,上一篇从task和operator出发说明了保存state的过程,到最后是由算子调用snapshot方法,进行state的快照操作。那么state究竟保存在哪里?

<1> State Backend简介

关于 Raw Bytes Storage and Backends

总结: StateBackend主要是针对raw bytes storage(即checkpoint),keyed state和operator state来提供功能的,其中checkpoint数据的存储则是通过CheckpointStreamFactory,而state存储,针对keyedState是通过AbstractKeyedStateBackend,针对operatorState是通过OperatorStateBackend。

<2>

RocksDBStateBackend的构造函数可以传入一个AbstractStateBackend,否则默认采用FsStateBackend。

可以看到,从OperatorState的角度来讲,目前Flink只有一个实现,即DefaultOperatorStateBackend,它将List风格的State保存在内存中。
从KeyedState的角度来讲,目前有两种实现,HeapKeyedStateBackend将state保存在内存中,而RocksDbKeyedStateBackend将State保存在TM本地的RocksDB中。相对而言,前者在内存中,速度会快,效率高,但一方面会限制state的大小,另一方面也会造成JVM自己的内存问题;后者在本地文件中,就会涉及序列化和反序列化,效率不及前者,但可以保存的state的大小会很大。

从checkpoint和savepoint的角度来看,Memory工厂方法都保存在内存中,显然不能在生产环境使用,而Fs工厂方法和RocksDb工厂方法,则统一都放在文件系统中,比如HDFS。

从上图中3,4两行可以看到,具体用来存储state的有三种HeapKeyedStateBackend,RocksDBKeyedStateBackend和DefaultOperatorStateBackend。

DefaultOperatorStateBackend
(1)

operator的ListSate的实现类PartitionableListState, OperatorState都保存在内存中,本质上还是一个ArrayList。

(2)snapshot方法

该snapshotStrategy是AbstractSnapshotStrategy<OperatorStateHandle>,而AbstractSnapshotStrategy有三种实现类:

中的snapshot方法:
该snapshot方法中,主要是对 registeredOperatorStates 和 registeredBroadcastStates 的snapshot。

第一步:

针对所有注册的state进行deepCopy,为了防止在checkpoint的时候数据结构又被修改,deepCopy其实是通过序列化和反序列化的过程;

第二步:
异步写入State和MetaInfo,先创建CheckpointStateOutputStream,通过调用factory的方法,这个factory是哪种类型的呢?这个是由定义的状态后端所决定的。之后会返回相应的OperatorstateHandle用作restore的过程。

第三步:
在StreamTask触发checkpoint的时候会将一个Task中所有的operator触发一次snapshot,触发部分就是上面1,2两个步骤,其中第二步是会返回一个RunnableFuture,在触发之后会提交一个AsyncSnapshotCallable异步任务,会阻塞一直等到checkpoint的Future,其实就是去调用这个方法AbstractAsyncIOCallable, 直到完成之后OperatorState会返回一个OperatorStateHandle,这个地方和后文的keyedState返回的handle不一样。

‘肆’ Flink基础系列28-Flink容错机制

在执行流应用程序期间,Flink 会定期保存状态的一致检查点

如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程

(如图中所示,7这个数据被source读到了,准备传给奇数流时,奇数流宕机了,数据传输发生中断)

遇到故障之后,第一步就是重启应用

(重启后,起初流都是空的)

第二步是从 checkpoint 中读取状态,将状态重置

(读取在远程仓库(Storage,这里的仓库指状态后端保存数据指定的三种方式之一)保存的状态)

从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同

第三步:开始消费并处理检查点到发生故障之间的所有数据

这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置

(这里要求source源也能记录状态,回退到读取数据7的状态,kafka有相应的偏移指针能完成该操作)

概述
checkpoint和Watermark一样,都会以广播的形式告诉所有下游。

具体讲解

JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点

(这个带有新检查点ID的东西为barrier,由图中三角型表示,数值2只是ID)

数据源将它们的状态写入检查点,并发出一个检查点barrier
状态后端在状态存入检查点之后,会返回通知给source任务,source任务就会向JobManager确认检查点完成
上图,在Source端接受到barrier后,将自己此身的3 和 4 的数据的状态写入检查点,且向JobManager发送checkpoint成功的消息,然后向下游分别发出一个检查点 barrier

可以看出在Source接受barrier时,数据流也在不断的处理,不会进行中断

此时的偶数流已经处理完蓝2变成了4,但是还没处理到黄4,只是下游sink发送了一个数据4,而奇数流已经处理完蓝3变成了8(黄1+蓝1+黄3+蓝3),并向下游sink发送了8

此时检查点barrier都还未到Sum_odd奇数流和Sum_even偶数流

分界线对齐:barrier向下游传递,sum任务会等待所有输入分区的barrier到达
对于barrier已经达到的分区,继续到达的数据会被缓存
而barrier尚未到达的分区,数据会被正常处理
此时蓝色流的barrier先一步抵达了偶数流,黄色的barrier还未到,但是因为数据的不中断一直处理,此时的先到的蓝色的barrier会将此时的偶数流的数据4进行缓存处理,流接着处理接下来的数据等待着黄色的barrier的到来,而黄色barrier之前的数据将会对缓存的数据相加

这次处理的总结:分界线对齐:barrier 向下游传递,sum 任务会等待所有输入分区的 barrier 到达,对于barrier已经到达的分区,继续到达的数据会被缓存。而barrier尚未到达的分区,数据会被正常处理

当收到所有输入分区的 barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发
当蓝色的barrier和黄色的barrier(所有分区的)都到达后,进行状态保存到远程仓库,然后对JobManager发送消息,说自己的检查点保存完毕了

此时的偶数流和奇数流都为8

向下游转发检查点 barrier 后,任务继续正常的数据处理

Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕
当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了

CheckPoint为自动保存,SavePoint为手动保存

有状态的流处理,内部每个算子任务都可以有自己的状态

对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。

一条数据不应该丢失,也不应该重复计算

在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。

Flink的一个重大价值在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力。

Flink使用了一种轻量级快照机制——检查点(checkpoint)来保证exactly-once语义
有状态流应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份备份(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时间。
应用状态的一致检查点,是Flink故障恢复机制的核心

端到端(end-to-end)状态一致性
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如Kafka)和输出到持久化系统

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性

整个端到端的一致性级别取决于所有组件中一致性最弱的组件

端到端 exactly-once

幂等写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。

(中间可能会存在不正确的情况,只能保证最后结果正确。比如5=>10=>15=>5=>10=>15,虽然最后是恢复到了15,但是中间有个恢复的过程,如果这个过程能够被读取,就会出问题。)

事务写入

预写日志(Write-Ahead-Log,WAL)
把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统
简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定
DataStream API提供了一个模版类:GenericWriteAheadSink,来实现这种事务性sink

两阶段提交(Two-Phase-Commit,2PC)
对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收到的数据添加到事务里
然后将这些数据写入外部sink系统,但不提交它们——这时只是"预提交"
这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统。Flink提供了TwoPhaseCommitSinkFunction接口

不同Source和Sink的一致性保证

内部——利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
source——kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重制偏移量,重新消费数据,保证一致性
sink——kafka procer作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction

Exactly-once 两阶段提交

JobManager 协调各个 TaskManager 进行 checkpoint 存储
checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存

当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流
barrier会在算子间传递下去

每个算子会对当前的状态做个快照,保存到状态后端
checkpoint 机制可以保证内部的状态一致性

每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里

sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务;遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务

(barrier之前的数据还是在之前的事务中没关闭事务,遇到barrier后的数据另外新开启一个事务)

当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成
sink 任务收到确认通知,正式提交之前的事务,kafka 中未确认数据改为“已确认”

Exactly-once 两阶段提交步骤总结

‘伍’ Flink 原理详解

Flink 是一个流处理框架,支持流处理和批处理,特点是流处理有限,可容错,可扩展,高吞吐,低延迟。

流处理是处理一条,立马下一个节点会从缓存中取出,在下一个节点进行计算

批处理是只有处理一批完成后,才会经过网络传输到下一个节点

流处理的优点是低延迟 批处理的优点是高吞吐

flink同时支持两种,flink的网络传输是设计固定的缓存块为单位,用户可以设置缓存块的超时值来决定换存块什么时候进行传输。 数据大于0 进行处理就是流式处理。
如果设置为无限大就是批处理模型。

Flink 集群包括 JobManager 和 TaskManager .

JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包 等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。

TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要 部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。

flink on yarn 是由client 提交 app到 RM 上, 然后RM 分配一个 AppMaster负责运行 Flink JobManager 和 Yarn AppMaster, 然后 AppMaster 分配 容器去运行 Flink TaskManger

SparkStreaming 是将流处理分成微批处理的作业, 最后的处理引擎是spark job

Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块,Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,然后分批次提交job到集群中去运行,运行每个 job的过程和真正的spark 任务没有任何区别。

JobScheler, 负责 Job的调度通过定时器每隔一段时间根据Dstream的依赖关系生一个一个DAG图

ReceiverTracker负责数据的接收,管理和分配
ReceiverTracker在启动Receiver的时候他有ReceiverSupervisor,其实现是ReceiverSupervisorImpl, ReceiverSupervisor本身启 动的时候会启动Receiver,Receiver不断的接收数据,通过BlockGenerator将数据转换成Block。定时器会不断的把Block数据通会不断的把Block数据通过BlockManager或者WAL进行存储,数据存储之后ReceiverSupervisorlmpl会把存储后的数据的元数据Metadate汇报给ReceiverTracker,其实是汇报给ReceiverTracker中的RPC实体ReceiverTrackerEndpoin

spark on yarn 的cluster模式, Spark client 向RM提交job请求, RM会分配一个 AppMaster, driver 和 运行在AppMAster节点里, AM然后把Receiver作为一个Task提交给Spark Executor 节点, Receive启动接受数据,生成数据块,并通知Spark Appmaster, AM会根据数据块生成相应的Job, 并把Job 提交给空闲的 Executor 去执行。

1:需要关注流数据是否需要进行状态管理
2:At-least-once或者Exectly-once消息投递模式是否有特殊要求
3:对于小型独立的项目,并且需要低延迟的场景,建议使用storm
4:如果你的项目已经使用了spark,并且秒级别的实时处理可以满足需求的话,建议使用sparkStreaming
5:要求消息投递语义为 Exactly Once 的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或窗口统计的场景,建议使用flink

Flink 提供的Api右 DataStream 和 DataSet ,他们都是不可变的数据集合,不可以增加删除中的元素, 通过 Source 创建 DataStream 和 DataSet

在创建运行时有:

Flink的每一个Operator称为一个任务, Operator 的每一个实例称为子任务,每一个任务在JVM线程中执行。可以将多个子任务链接成一个任务,减少上下文切换的开销,降低延迟。

source 和 算子map 如果是 one by one 的关系,他们的数据交换可以通过缓存而不是网络通信

TaskManager 为控制执行任务的数量,将计算资源划分多个slot,每个slot独享计算资源,这种静态分配利于任务资源隔离。

同一个任务可以共享一个slot, 不同作业不可以。

这里因为 Source 和 Map 并行度都是4 采用直连方式,他们的数据通信采用缓存形式

所以一共需要两个TaskManager source,Map 一个,rece一个, 每个TaskManager 要3个slot

JobManager 将 JobGraph 部署 ExecutionGraph

设置的并行度,可以让一个ExecJobVertex 对应 多个并行的ExecVertex 实例。

Flink通过状态机管理 ExecGraph的作业执行进度。

Flink 将对象序列化为固定数量的预先分配的内存段,而不是直接把对象放在堆内存上。
Flink TaskManager 是由几个内部组件组成的:actor 系统(负责与 Flink master 协调)、IOManager(负责将数据溢出到磁盘并将其读取回来)、MemoryManager(负责协调内存使用。

数据源:

Sink:

时间:

处理时间:取自Operator的机器系统时间

事件时间: 由数据源产生

进入时间: 被Source节点观察时的系统时间

如果数据源没有自己正确创建水印,程序必须自己生成水印来确保基于事件的时间窗口可以正常工作。。

DataStream 提供了 周期性水印,间歇式水印,和递增式水印

‘陆’ Flink:特性、概念、组件栈、架构及原理分析

简单之美 | Apache Flink:特性、概念、组件栈、架构及原理分析
http://shiyanjun.cn/archives/1508.html

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们它们所提供的SLA是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有MapRece、Tez、Crunch、Spark,实现流处理的开源方案有Samza、Storm。Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基于同一个Flink运行时(Flink Runtime),分别提供了流处理和批处理API,而这两种API也是实现上层面向流处理、批处理类型应用框架的基础。
基本特性
关于Flink所支持的特性,我这里只是通过分类的方式简单做一下梳理,涉及到具体的一些概念及其原理会在后面的部分做详细说明。
流处理特性
支持高吞吐、低延迟、高性能的流处理
支持带有事件时间的窗口(Window)操作
支持有状态计算的Exactly-once语义
支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
支持具有Backpressure功能的持续流模型
支持基于轻量级分布式快照(Snapshot)实现的容错
一个运行时同时支持Batch on Streaming处理和Streaming处理
Flink在JVM内部实现了自己的内存管理
支持迭代计算
支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

API支持
对Streaming数据类应用,提供DataStream API
对批处理类应用,提供DataSet API(支持Java/Scala)

Libraries支持
支持机器学习(FlinkML)
支持图分析(Gelly)
支持关系数据处理(Table)
支持复杂事件处理(CEP)

整合支持
支持Flink on YARN
支持HDFS
支持来自Kafka的输入数据
支持Apache HBase
支持Hadoop程序
支持Tachyon
支持ElasticSearch
支持RabbitMQ
支持Apache Storm
支持S3
支持XtreemFS

基本概念
Stream & Transformation & Operator
用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。下面是一个由Flink程序映射为Streaming Dataflow的示意图,如下所示:

比如从Source[1]到map()[1],它保持了Source的分区特性(Partitioning)和分区内元素处理的有序性,也就是说map()[1]的Subtask看到数据流中记录的顺序,与Source[1]中看到的记录顺序是一致的。
Redistribution模式

这种模式改变了输入数据流的分区,比如从map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多个不同的Subtask发送数据,改变了数据流的分区,这与实际应用所选择的Operator有关系。另外,Source Operator对应2个Subtask,所以并行度为2,而Sink Operator的Subtask只有1个,故而并行度为1。
Task & Operator Chain
在Flink分布式执行环境中,会将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行,如下图所示:

在Flink集群启动的时候,TaskManager会向JobManager注册,如果注册成功,则JobManager会向TaskManager回复消息AcknowledgeRegistration。
SubmitJob

Flink程序内部通过Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。
CancelJob

请求取消一个Flink Job的执行,CancelJob消息中包含了Job的ID,如果成功则返回消息CancellationSuccess,失败则返回消息CancellationFailure。
UpdateTaskExecutionState

TaskManager会向JobManager请求更新ExecutionGraph中的ExecutionVertex的状态信息,更新成功则返回true。
RequestNextInputSplit

运行在TaskManager上面的Task,请求获取下一个要处理的输入Split,成功则返回NextInputSplit。
JobStatusChanged

ExecutionGraph向JobManager发送该消息,用来表示Flink Job的状态发生的变化,例如:RUNNING、CANCELING、FINISHED等。
TaskManager
TaskManager也是一个Actor,它是实际负责执行计算的Worker,在其上执行Flink Job的一组Task。每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。TaskManager端可以分成两个阶段:
注册阶段

TaskManager会向JobManager注册,发送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然后TaskManager就可以进行初始化过程。
可操作阶段

该阶段TaskManager可以接收并处理与Task有关的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager无法连接到JobManager,这是TaskManager就失去了与JobManager的联系,会自动进入“注册阶段”,只有完成注册才能继续处理Task相关的消息。
Client
当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。Client会将用户提交的Flink程序组装一个JobGraph, 并且是以JobGraph的形式提交的。一个JobGraph是一个Flink Dataflow,它由多个JobVertex组成的DAG。其中,一个JobGraph包含了一个Flink程序的如下信息:JobID、Job名称、配置信息、一组JobVertex等。
组件栈
Flink是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。Flink分层的组件栈如下图所示:

了解YARN的话,对上图的原理非常熟悉,实际Flink也实现了满足在YARN集群上运行的各个组件:Flink YARN Client负责与YARN RM通信协商资源请求,Flink JobManager和Flink TaskManager分别申请到Container去运行各自的进程。通过上图可以看到,YARN AM与Flink JobManager在同一个Container中,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,并进行后续的映射、调度和计算处理。
Runtime层

Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。
API层

API层主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API。
Libraries层

该层也可以称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于sql-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。
内部原理
容错机制
Flink基于Checkpoint机制实现容错,它的原理是不断地生成分布式Streaming数据流Snapshot。在流处理失败时,通过这些Snapshot可以恢复数据流处理。理解Flink的容错机制,首先需要了解一下Barrier这个概念:Stream Barrier是Flink分布式Snapshotting中的核心元素,它会作为数据流的记录被同等看待,被插入到数据流中,将数据流中记录的进行分组,并沿着数据流的方向向前推进。每个Barrier会携带一个Snapshot ID,属于该Snapshot的记录会被推向该Barrier的前方。因为Barrier非常轻量,所以并不会中断数据流。带有Barrier的数据流,如下图所示:

接收到Barrier n的Stream被临时搁置,来自这些Stream的记录不会被处理,而是被放在一个Buffer中
一旦最后一个Stream接收到Barrier n,Operator会emit所有暂存在Buffer中的记录,然后向Checkpoint Coordinator发送Snapshot n
继续处理来自多个Stream的记录

基于Stream Aligning操作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐Barrier的一个Stream为处理Buffer中缓存记录的时刻点。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。
调度机制
在JobManager端,会接收到Client提交的JobGraph形式的Flink Job,JobManager会将一个JobGraph转换映射为一个ExecutionGraph,如下图所示:

迭代机制
机器学习和图计算应用,都会使用到迭代计算,Flink通过在迭代Operator中定义Step函数来实现迭代算法,这种迭代算法包括Iterate和Delta Iterate两种类型,在实现上它们反复地在当前迭代状态上调用Step函数,直到满足给定的条件才会停止迭代。下面,对Iterate和Delta Iterate两种类型的迭代算法原理进行说明:
Iterate

Iterate Operator是一种简单的迭代形式:每一轮迭代,Step函数的输入或者是输入的整个数据集,或者是上一轮迭代的结果,通过该轮迭代计算出下一轮计算所需要的输入(也称为Next Partial Solution),满足迭代的终止条件后,会输出最终迭代结果,具体执行流程如下图所示:

Delta Iterate Operator实现了增量迭代,它的实现原理如下图所示:

另外,Flink还提供了3个参数来配置Backpressure监控行为:
参数名称
默认值
说明

jobmanager.web.backpressure.refresh-interval
60000
默认1分钟,表示采样统计结果刷新时间间隔

jobmanager.web.backpressure.num-samples
100
评估Backpressure状态,所使用的堆栈跟踪调用次数

jobmanager.web.backpressure.delay-between-samples
50
默认50毫秒,表示对一个Job的每个Task依次调用的时间间隔

通过上面个定义的Backpressure状态,以及调整相应的参数,可以确定当前运行的Job的状态是否正常,并且保证不影响JobManager提供服务。
参考链接
http://flink.apache.org/
http://flink.apache.org/features.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/general_arch.html
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheling.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/libs/cep.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/gelly.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/ml/index.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/table.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
http://geek.csdn.net/news/detail/56272
http://samza.apache.org/

‘柒’ Flink内存管理

java所有数据类型对应的字节大小

java对象的组成 : 对象头,实例数据,对齐部分

jvm 序列化缺点

上面图为TaskManager内存模型,左边为细分的内存模型,右边为整体内存模型,该图摘自Flink官网

heap内存在jvm启动的时候申请的一块不变的内存区域,该内存实际上是Flink和task公用的一块区域,在flink层面通过控制来区分框架使用和task内存,heap内存管理起来是比较容易的,实际上non-heap的内存是难管理的一块,如果管理不当或者使用不当可能造成内存泄漏或者内存无限增长等问题

内存参数配置

在flink中对内存进行了抽象成了MemorySegment,�默认情况下,一个 MemorySegment 对应着一个 32KB 大小的内存块,这块内存既可以是堆上内存( byte数组) ,也可以是堆外内存(nio的ByteBufferr ) .

同时MemorySegment也提供了对二进制数据的操作方法,以及读取字节数组序列化以及序列化字节数组的方法等

下面是类继承图,该类有两MemorySegment实现类有两个分别为使用heap的以及混合的即有heap和non-heap,对于内存的访问有子类具体的实现

MemorySemgent是flink内存分配的最小单元了,对于数据夸MemorySemgent保存,那么对于上层的使用者来说,需要考虑考虑所有的细节,由于过于繁琐,所以在MemorySemgent上又抽象了一层内存也,内存也是在MemorySemgent数据访问上的视图,对数据输入和输出分别抽象为DataInputView/DataOutputView,有了这一层,上层使用者无需关心跨MemorySemgent的细节问题,内存也对自动处理跨MemorySemgent的内存操作

DataInputView

DataInputView继承DataInput,DataInputView是对MemorySemgent读取的抽象视图,提供一系列读取二进制数据不同类型的方法,AbstractPageInputView是DataInputView的一个抽象实现类,并且基本所有InputView都实现了该类,即所有实现该类的InputView都支持Page

InputView持有了多个MemorySemgent的引用(可以基于数组,list,deque等),这些MemorySemgent被视为一个内存页,可以顺序,随机等方式读取数据,要基于不同的实现类,实现类不同读取方式不同

方法图

DataOutputView

与DataInputView相对应,继承Output,并有一个拥有Page功能的抽象类(AbstractPagedOutputView),其大部outputView的实现都是继承自该抽象类,对一组MemorySemgent提供一个基于页的写入功能

方法图

类继承图

用于网络io数据的包装,每个buffer持有一个MemorySegment的引用,resultPartition写数据的时候,会向LocalBufferPool申请Buffer,会返回BufferBuilder,通过BufferBuilder想Buffe r<实际写入的是MemorySegment> 写数据

BufferBuilder是在上游Task中,负责想Buffer写入数据,BufferConsumer位于下游,与BufferBuilder相对应,用于消费Buffer的数据,每个bufferBuilder对应一个bufferConsumer

常用参数介绍

buffer申请

buffer回收

当buffer用完之后需要进行回收比如在netty的clientHandler收到响应之后进行处理就会把buffer回收掉,buffer回收之后并不会释放memorySegment,而是放回池中,变为可用内存,反复使用

flink托管的内存,托管内存使用堆外内存,用于批处理缓存排序等以及提供rocksDB内存

NetworkBufferPool是一个固定大小的MemorySegment实例吃,用于网络栈中,NettyBufferPool会为每个ResultPartition创建属于自己的LocalBufferPool,NettyBufferPool会作为全局的pool来提供内存,LocalBufferPool会通过限制来控制自己内存的申请,防止过多申请

LocalBufferPool继承关系,实现了bufferRecycler的接口,用于回收自己持有的buffer

在数据接收的时候会将数据封装成NettyBuffer,在数据发送的时候会通过BufferBilder向MemorySegment写入数据,然后通过BufferConsumer读取MemorySegment的数据

BufferManager主要用于为RemoteInputChannel提供buffer的,bufferManager在启动的时候会向全局bufferPool请求自己的独有buffer,当bufferManager的buffer不够的时候,则会向localBufferPool请求buffer,此时请求的buffer为浮动buffer

实际上提供的buffer是

‘捌’ flink 窗口数据存储在哪里

窗口数据信息一般会存储在C盘。
因为这样的数据信息属于运行缓存数据信息,它会在C盘进行运行的存储应用。

‘玖’ 基于flink sql构建实时数据仓库

根据目前大数据这一块的发展,已经不局限于离线的分析,挖掘数据潜在的价值,数据的时效性最近几年变得刚需,实时处理的框架有storm,spark-streaming,flink等。想要做到实时数据这个方案可行,需要考虑以下几点:1、状态机制 2、精确一次语义 3、高吞吐量 4、可弹性伸缩的应用 5、容错机制,刚好这几点,flink都完美的实现了,并且支持flink sql高级API,减少了开发成本,可用实现快速迭代,易维护等优点。

离线数仓的架构图:

实时数仓架构图:

目前是将实时维度表和DM层数据存于hbase当中,实时公共层都存于kafka当中,并且以写滚动日志的方式写入HDFS(主要是用于校验数据)。其实在这里可以做的工作还有很多,kafka集群,flink集群,hbase集群相互独立,这对整个实时数据仓库的稳定性带来一定的挑战。

一个数据仓库想要成体系,成资产,离不开数据域的划分。所以参考着离线的数据仓库,想着在实时数仓做出这方面的探索,理论上来讲,离线可以实现的,实时也是可以实现的。 并且目前已经取得了成效,目前划分的数据域跟离线大致相同,有流量域,交易域,营销域等等。当然这里面涉及到维表,多事务事实表,累计快照表,周期性快照表的设计,开发,到落地这里就不详述了。

维度表也是整个实时数据仓库不可或缺的部分。从目前整个实时数仓的建设来看,维度表有着数据量大,但是变更少的特点,我们试想过构建全平台的实时商品维度表或者是实时会员维度表,但是这类维度表太过于复杂,所以针对这类维度表下面介绍。还有另外一种就是较为简单的维度表,这类维度可能对应着业务系统单个mysql表,或者只需要几个表进行简单ETL就可以产出的表,这类维表是可以做成实时的。以下有几个实施的关键点:

如下是离线数据同步架构图:

实时数据的接入其实在底层架构是一样的,就是从kafka那边开始不一样,实时用flink的UDTF进行解析,而离线是定时(目前是小时级)用camus拉到HDFS,然后定时load HDFS的数据到hive表里面去,这样来实现离线数据的接入。实时数据的接入是用flink解析kafka的数据,然后在次写入kafka当中去。

由于目前离线数据已经稳定运行了很久,所以实时接入数据的校验可以对比离线数据,但是离线数据是小时级的hive数据,实时数据存于kafka当中,直接比较不了,所以做了相关处理,将kafka的数据使用flink写HDFS滚动日志的形式写入HDFS,然后建立hive表小时级定时去load HDFS中的文件,以此来获取实时数据。

完成以上两点,剩余还需要考虑一点,都是小时级的任务,这个时间卡点使用什么字段呢?首先要确定一点就是离线和实时任务卡点的时间字段必须是一致的,不然肯定会出问题。目前离线使用camus从kafka将数据拉到HDFS上,小时级任务,使用nginx_ts这个时间字段来卡点,这个字段是上报到nginx服务器上记录的时间点。而实时的数据接入是使用flink消费kafka的数据,在以滚动日志的形式写入HDFS的,然后在建立hive表load HDFS文件获取数据,虽然这个hive也是天/小时二级分区,但是离线的表是根据nginx_ts来卡点分区,但是实时的hive表是根据任务启动去load文件的时间点去区分的分区,这是有区别的,直接筛选分区和离线的数据进行对比,会存在部分差异,应当的做法是筛选范围分区,然后在筛选nginx_ts的区间,这样在跟离线做对比才是合理的。

目前实时数据接入层的主要时延是在UDTF函数解析上,实时的UDTF函数是根据上报的日志格式进行开发的,可以完成日志的解析功能。

解析流程图如下:

解析速率图如下:

该图还不是在峰值数据量的时候截的,目前以800记录/second为准,大概一个记录的解析速率为1.25ms。
目前该任务的flink资源配置核心数为1,假设解析速率为1.25ms一条记录,那么峰值只能处理800条/second,如果数据接入速率超过该值就需要增加核心数,保证解析速率。

介绍一下目前离线维度表的情况,就拿商品维度表来说,全线记录数将近一个亿,计算逻辑来自40-50个ods层的数据表,计算逻辑相当复杂,如果实时维度表也参考离线维度表来完成的话,那么开发成本和维护成本非常大,对于技术来讲也是很大的一个挑战,并且目前也没有需求要求维度属性百分百准确。所以目前(伪实时维度表)准备在当天24点产出,当天的维度表给第二天实时公共层使用,即T-1的模式。伪实时维度表的计算逻辑参考离线维度表,但是为了保障在24点之前产出,需要简化一下离线计算逻辑,并且去除一些不常用的字段,保障伪实时维度表可以较快产出。

实时维度表的计算流程图:

目前使用flink作为公司主流的实时计算引擎,使用内存作为状态后端,并且固定30s的间隔做checkpoint,使用HDFS作为checkpoint的存储组件。并且checkpoint也是作为任务restart以后恢复状态的重要依据。熟悉flink的人应该晓得,使用内存作为状态后端,这个内存是JVM的堆内存,毕竟是有限的东西,使用不得当,OOM是常有的事情,下面就介绍一下针对有限的内存,如果完成常规的计算。

‘拾’ 转载:阿里巴巴为什么选择Apache Flink

本文主要整理自阿里巴巴计算平台事业部资深技术专家莫问在云栖大会的演讲。

合抱之木,生于毫末

随着人工智能时代的降临,数据量的爆发,在典型的大数据的业务场景下数据业务最通用的做法是:选用批处理的技术处理全量数据,采用流式计算处理实时增量数据。在绝大多数的业务场景之下,用户的业务逻辑在批处理和流处理之中往往是相同的。但是,用户用于批处理和流处理的两套计算引擎是不同的。

因此,用户通常需要写两套代码。毫无疑问,这带来了一些额外的负担和成本。阿里巴巴的商品数据处理就经常需要面对增量和全量两套不同的业务流程问题,所以阿里就在想,我们能不能有一套统一的大数据引擎技术,用户只需要根据自己的业务逻辑开发一套代码。这样在各种不同的场景下,不管是全量数据还是增量数据,亦或者实时处理,一套方案即可全部支持, 这就是阿里选择Flink的背景和初衷

目前开源大数据计算引擎有很多选择,流计算如Storm,Samza,Flink,Kafka Stream等,批处理如Spark,Hive,Pig,Flink等。而同时支持流处理和批处理的计算引擎,只有两种选择:一个是Apache Spark,一个是Apache Flink。

从技术,生态等各方面的综合考虑。首先,Spark的技术理念是基于批来模拟流的计算。而Flink则完全相反,它采用的是基于流计算来模拟批计算。

从技术发展方向看,用批来模拟流有一定的技术局限性,并且这个局限性可能很难突破。而Flink基于流来模拟批,在技术上有更好的扩展性。从长远来看,阿里决定用Flink做一个统一的、通用的大数据引擎作为未来的选型。

Flink是一个低延迟、高吞吐、统一的大数据计算引擎。在阿里巴巴的生产环境中,Flink的计算平台可以实现毫秒级的延迟情况下,每秒钟处理上亿次的消息或者事件。同时Flink提供了一个Exactly-once的一致性语义。保证了数据的正确性。这样就使得Flink大数据引擎可以提供金融级的数据处理能力。

Flink在阿里的现状

基于Apache Flink在阿里巴巴搭建的平台于2016年正式上线,并从阿里巴巴的搜索和推荐这两大场景开始实现。目前阿里巴巴所有的业务,包括阿里巴巴所有子公司都采用了基于Flink搭建的实时计算平台。同时Flink计算平台运行在开源的Hadoop集群之上。采用Hadoop的YARN做为资源管理调度,以 HDFS作为数据存储。因此,Flink可以和开源大数据软件Hadoop无缝对接。

目前,这套基于Flink搭建的实时计算平台不仅服务于阿里巴巴集团内部,而且通过阿里云的云产品API向整个开发者生态提供基于Flink的云产品支持。

Flink在阿里巴巴的大规模应用,表现如何?

规模: 一个系统是否成熟,规模是重要指标,Flink最初上线阿里巴巴只有数百台服务器,目前规模已达上万台,此等规模在全球范围内也是屈指可数;

状态数据: 基于Flink,内部积累起来的状态数据已经是PB级别规模;

Events: 如今每天在Flink的计算平台上,处理的数据已经超过万亿条;

PS: 在峰值期间可以承担每秒超过4.72亿次的访问,最典型的应用场景是阿里巴巴双11大屏;

Flink的发展之路

接下来从开源技术的角度,来谈一谈Apache Flink是如何诞生的,它是如何成长的?以及在成长的这个关键的时间点阿里是如何进入的?并对它做出了那些贡献和支持?

Flink诞生于欧洲的一个大数据研究项目StratoSphere。该项目是柏林工业大学的一个研究性项目。早期,Flink是做Batch计算的,但是在2014年,StratoSphere里面的核心成员孵化出Flink,同年将Flink捐赠Apache,并在后来成为Apache的顶级大数据项目,同时Flink计算的主流方向被定位为Streaming,即用流式计算来做所有大数据的计算,这就是Flink技术诞生的背景。

2014年Flink作为主攻流计算的大数据引擎开始在开源大数据行业内崭露头角。区别于Storm,Spark Streaming以及其他流式计算引擎的是:它不仅是一个高吞吐、低延迟的计算引擎,同时还提供很多高级的功能。比如它提供了有状态的计算,支持状态管理,支持强一致性的数据语义以及支持Event Time,WaterMark对消息乱序的处理。

Flink核心概念以及基本理念

Flink最区别于其他流计算引擎的,其实就是状态管理。

什么是状态?例如开发一套流计算的系统或者任务做数据处理,可能经常要对数据进行统计,如Sum,Count,Min,Max,这些值是需要存储的。因为要不断更新,这些值或者变量就可以理解为一种状态。如果数据源是在读取Kafka,RocketMQ,可能要记录读取到什么位置,并记录Offset,这些Offset变量都是要计算的状态。

Flink提供了内置的状态管理,可以把这些状态存储在Flink内部,而不需要把它存储在外部系统。这样做的好处是第一降低了计算引擎对外部系统的依赖以及部署,使运维更加简单;第二,对性能带来了极大的提升:如果通过外部去访问,如Redis,HBase它一定是通过网络及RPC。如果通过Flink内部去访问,它只通过自身的进程去访问这些变量。同时Flink会定期将这些状态做Checkpoint持久化,把Checkpoint存储到一个分布式的持久化系统中,比如HDFS。这样的话,当Flink的任务出现任何故障时,它都会从最近的一次Checkpoint将整个流的状态进行恢复,然后继续运行它的流处理。对用户没有任何数据上的影响。

Flink是如何做到在Checkpoint恢复过程中没有任何数据的丢失和数据的冗余?来保证精准计算的?

这其中原因是Flink利用了一套非常经典的Chandy-Lamport算法,它的核心思想是把这个流计算看成一个流式的拓扑,定期从这个拓扑的头部Source点开始插入特殊的Barries,从上游开始不断的向下游广播这个Barries。每一个节点收到所有的Barries,会将State做一次Snapshot,当每个节点都做完Snapshot之后,整个拓扑就算完整的做完了一次Checkpoint。接下来不管出现任何故障,都会从最近的Checkpoint进行恢复。

Flink利用这套经典的算法,保证了强一致性的语义。这也是Flink与其他无状态流计算引擎的核心区别。

下面介绍Flink是如何解决乱序问题的。比如星球大战的播放顺序,如果按照上映的时间观看,可能会发现故事在跳跃。

在流计算中,与这个例子是非常类似的。所有消息到来的时间,和它真正发生在源头,在线系统Log当中的时间是不一致的。在流处理当中,希望是按消息真正发生在源头的顺序进行处理,不希望是真正到达程序里的时间来处理。Flink提供了Event Time和WaterMark的一些先进技术来解决乱序的问题。使得用户可以有序的处理这个消息。这是Flink一个很重要的特点。

接下来要介绍的是Flink启动时的核心理念和核心概念,这是Flink发展的第一个阶段;第二个阶段时间是2015年和2017年,这个阶段也是Flink发展以及阿里巴巴介入的时间。故事源于2015年年中,我们在搜索事业部的一次调研。当时阿里有自己的批处理技术和流计算技术,有自研的,也有开源的。但是,为了思考下一代大数据引擎的方向以及未来趋势,我们做了很多新技术的调研。

结合大量调研结果,我们最后得出的结论是:解决通用大数据计算需求,批流融合的计算引擎,才是大数据技术的发展方向,并且最终我们选择了Flink。

但2015年的Flink还不够成熟,不管是规模还是稳定性尚未经历实践。最后我们决定在阿里内部建立一个Flink分支,对Flink做大量的修改和完善,让其适应阿里巴巴这种超大规模的业务场景。在这个过程当中,我们团队不仅对Flink在性能和稳定性上做出了很多改进和优化,同时在核心架构和功能上也进行了大量创新和改进,并将其贡献给社区,例如:Flink新的分布式架构,增量Checkpoint机制,基于Credit-based的网络流控机制和Streaming SQL等。

阿里巴巴对Flink社区的贡献

我们举两个设计案例,第一个是阿里巴巴重构了Flink的分布式架构,将Flink的Job调度和资源管理做了一个清晰的分层和解耦。这样做的首要好处是Flink可以原生的跑在各种不同的开源资源管理器上。经过这套分布式架构的改进,Flink可以原生地跑在Hadoop Yarn和Kubernetes这两个最常见的资源管理系统之上。同时将Flink的任务调度从集中式调度改为了分布式调度,这样Flink就可以支持更大规模的集群,以及得到更好的资源隔离。

另一个是实现了增量的Checkpoint机制,因为Flink提供了有状态的计算和定期的Checkpoint机制,如果内部的数据越来越多,不停地做Checkpoint,Checkpoint会越来越大,最后可能导致做不出来。提供了增量的Checkpoint后,Flink会自动地发现哪些数据是增量变化,哪些数据是被修改了。同时只将这些修改的数据进行持久化。这样Checkpoint不会随着时间的运行而越来越难做,整个系统的性能会非常地平稳,这也是我们贡献给社区的一个很重大的特性。

经过2015年到2017年对Flink Streaming的能力完善,Flink社区也逐渐成熟起来。Flink也成为在Streaming领域最主流的计算引擎。因为Flink最早期想做一个流批统一的大数据引擎,2018年已经启动这项工作,为了实现这个目标,阿里巴巴提出了新的统一API架构,统一SQL解决方案,同时流计算的各种功能得到完善后,我们认为批计算也需要各种各样的完善。无论在任务调度层,还是在数据Shuffle层,在容错性,易用性上,都需要完善很多工作。

篇幅原因,下面主要和大家分享两点:

● 统一 API Stack

● 统一 SQL方案

先来看下目前Flink API Stack的一个现状,调研过Flink或者使用过Flink的开发者应该知道。Flink有2套基础的API,一套是DataStream,一套是DataSet。DataStream API是针对流式处理的用户提供,DataSet API是针对批处理用户提供,但是这两套API的执行路径是完全不一样的,甚至需要生成不同的Task去执行。所以这跟得到统一的API是有冲突的,而且这个也是不完善的,不是最终的解法。在Runtime之上首先是要有一个批流统一融合的基础API层,我们希望可以统一API层。

因此,我们在新架构中将采用一个DAG(有限无环图)API,作为一个批流统一的API层。对于这个有限无环图,批计算和流计算不需要泾渭分明的表达出来。只需要让开发者在不同的节点,不同的边上定义不同的属性,来规划数据是流属性还是批属性。整个拓扑是可以融合批流统一的语义表达,整个计算无需区分是流计算还是批计算,只需要表达自己的需求。有了这套API后,Flink的API Stack将得到统一。

除了统一的基础API层和统一的API Stack外,同样在上层统一SQL的解决方案。流和批的SQL,可以认为流计算有数据源,批计算也有数据源,我们可以将这两种源都模拟成数据表。可以认为流数据的数据源是一张不断更新的数据表,对于批处理的数据源可以认为是一张相对静止的表,没有更新的数据表。整个数据处理可以当做SQL的一个Query,最终产生的结果也可以模拟成一个结果表。

对于流计算而言,它的结果表是一张不断更新的结果表。对于批处理而言,它的结果表是相当于一次更新完成的结果表。从整个SOL语义上表达,流和批是可以统一的。此外,不管是流式SQL,还是批处理SQL,都可以用同一个Query来表达复用。这样以来流批都可以用同一个Query优化或者解析。甚至很多流和批的算子都是可以复用的。

Flink的未来方向

首先,阿里巴巴还是要立足于Flink的本质,去做一个全能的统一大数据计算引擎。将它在生态和场景上进行落地。目前Flink已经是一个主流的流计算引擎,很多互联网公司已经达成了共识:Flink是大数据的未来,是最好的流计算引擎。下一步很重要的工作是让Flink在批计算上有所突破。在更多的场景下落地,成为一种主流的批计算引擎。然后进一步在流和批之间进行无缝的切换,流和批的界限越来越模糊。用Flink,在一个计算中,既可以有流计算,又可以有批计算。

第二个方向就是Flink的生态上有更多语言的支持,不仅仅是Java,Scala语言,甚至是机器学习下用的Python,Go语言。未来我们希望能用更多丰富的语言来开发Flink计算的任务,来描述计算逻辑,并和更多的生态进行对接。

最后不得不说AI,因为现在很多大数据计算的需求和数据量都是在支持很火爆的AI场景,所以在Flink流批生态完善的基础上,将继续往上走,完善上层Flink的Machine Learning算法库,同时Flink往上层也会向成熟的机器学习,深度学习去集成。比如可以做Tensorflow On Flink, 让大数据的ETL数据处理和机器学习的Feature计算和特征计算,训练的计算等进行集成,让开发者能够同时享受到多种生态给大家带来的好处。