『壹』 【Flink 精選】闡述 Flink 的容錯機制,剖析 Checkpoint 實現流程
Flink 容錯機制主要是 狀態的保存和恢復,涉及 state backends 狀態後端、checkpoint 和 savepoint,還有 Job 和 Task 的錯誤恢復 。
Flink 狀態後端是指 保存 Checkpoint 數據的容器 ,其分類有 MemoryStateBackend、FsStateBackend、RocksDBStateBackend ,狀態的分類有 operator state 和 keyed state 。
Flink 狀態保存和恢復主要依靠 Checkpoint 機制和 Savepoint 機制,兩者的區別如下表所示。
快照的概念來源於相片,指照相館的一種沖洗過程短的照片。在計算機領域, 快照是數據存儲的某一時刻的狀態記錄 。 Flink Snapshot 快照是指作業狀態的全局一致記錄 。一個完整的快照是包括 source 運算元的狀態(例如,消費 kafka partition 的 offset)、狀態運算元的緩存數據和 sink 運算元的狀態(批量緩存數據、事務數據等)。
Checkpoint 檢查點可以自動產生快照,用於Flink 故障恢復 。Checkpoint 具有分布式、非同步、增量的特點。
Savepoint 保存點是用戶手動觸發的,保存全量的作業狀態數據 。一般使用場景是作業的升級、作業的並發度縮放、遷移集群等。
Flink 是採用輕量級的分布式非同步快照,其實現是採用柵欄 barrier 作為 checkpoint 的傳遞信號,與業務數據一樣無差別地傳遞下去 ,目的是使得數據流被切分成微批,進行 checkpoint 保存為 snapshot。當 barrier 經過流圖節點的時候,Flink 進行 checkpoint 保存狀態數據。
如下圖所示,checkpoint n 包含每個運算元的狀態,該狀態是指checkpoint n 之前的全部事件,而不包含它之後的所有事件。
針對用戶作業出現故障而導致結果丟失或者重復的問題,Flink 提供3種語義:
① At-Least-Once 最少一次 :不會丟失數據,但可能會有重復結果。
② Exactly-Once 精確一次 :checkpoint barrier 對齊機制可以保障精確一次。
① FailureRateRestartStrategy :允許在指定時間間隔內的最大失敗次數,同時可以設置重啟延時時間。
② FixedDelayRestartStrategy :允許指定的失敗次數,同時可以設置重啟延時時間。
③ NoRestartStrategy :不需要重啟,即 Job 直接失敗。
④ ThrowingRestartStrategy :不需要重啟,直接拋異常。
Job Restart 策略可以通過 env 設置。
上述策略的父類介面是RestartStrategy,其關鍵是restart(重啟操作)。
① RestartAllStrategy :重啟全部 task,默認策略。
② RestartIndivialStrategy :恢復單個 task。如果該 task 沒有source,可能導致數據丟失。
③ NoOpFailoverStrategy :不恢復 task。
上述策略的父類介面是FailoverStrategy,其關鍵是Factory的create(創建 strategy)、onTaskFailure(處理錯誤)。
如何產生可靠的全局一致性快照是分布式系統的難點,其傳統方案是使用的全局時鍾,但存在單點故障、數據不一致等可靠性問題 。為了解決該問題, Chandy-Lamport 演算法採用 marker 的傳播來代替全局時鍾 。
① 進程 Pi 記錄自己的進程狀態,同時生產一個標識信息 marker(與正常 message 不同),通過 ouput channel 發送給系統裡面的其他進程。
② 進程 Pi 開始記錄所有 input channel 接收到的 message
進程 Pj 從 input channel Ckj 接收到 marker。如果 Pj 還沒有記錄自己的進程狀態,則 Pj 記錄自己的進程狀態,向 output channel 發送 marker;否則 Pj 正在記錄自己的進程狀態(該 marker 之前的 message)。
所有的進程都收到 marker 信息並且記錄下自己的狀態和 channel 的狀態(包含的 message)。
Flink 的分布式非同步快照實現了Chandy Lamport 演算法,其核心思想是 在 source 插入 barrier 代替 Chandy-Lamport 演算法中的 marker,通過控制 barrier 的同步來實現 snapshot 的備份和 Exactly-Once 語義 。
Checkpoint Coordinator 向所有 source 節點 trigger Checkpoint。
source task向下游廣播barrier。
當source task備份完自己的狀態後,會將備份數據的地址(state handle)通知 Checkpoint Coordinator。
map和sink task收集齊上游source的barrier n,執行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB會全量保存到磁碟上(紅色大三角表示),然後Flink會從中選擇沒有上傳的文件進行持久化備份(紫色小三角)。
map和sink task在完成Checkpoint 之後,將狀態地址state handle返回通知 Coordinator。
當Checkpoint Coordinator收到全部task的state handle,就確定該Checkpoint已完成,並向持久化存儲中備份一個Checkpoint Meta(元數據,包括該checkpoint狀態數據的備份地址)。
『貳』 flink配置和內存
1.Hosts and Ports
metrics.internal.query-service.port "0" String Accepts a list of ports (「50100,50101」), ranges(「50100-50200」) or a combination of both.
rest.bind-address (none) String
rest.bind-port "8081" String
taskmanager.data.port 0 Integer 任務管理器的外部埠,用於數據交換操作。
taskmanager.host (none) String
taskmanager.rpc.port "0" String
2.Fault Tolerance
restart-strategy 重啟策略 默認空
restart-strategy.fixed-delay.attempts 固定周期重啟
restart-strategy.fixed-delay.delay 固定周期重啟
restart-strategy.failure-rate.delay 失敗率重啟
restart-strategy.failure-rate.failure-rate-interval 失敗率重啟
restart-strategy.failure-rate.max-failures-per-interval 失敗率重啟
state.backend.incremental 增量checkpoint(僅對rocksdb支持)
state.backend.local-recovery 狀態後端配置本地恢復,默認false,只支持鍵控狀態後端
state.checkpoints.num-retained 保留的最大已完成檢查點數,默認1
taskmanager.state.local.root-dirs 本地恢復的根目錄
3.High Availability
high-availability 默認為NONE To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.
high-availability.cluster-id 高可用flink集群ID
high-availability.storageDir 元數據路徑
high-availability.zookeeper.path.root 配置ZK路徑
high-availability.zookeeper.quorum 配置ZK集群
4.Memory Configuration
在大多數情況下,用戶只需要設置值taskmanager.memory.process.size或taskmanager.memory.flink.size(取決於設置方式),並可能通過調整JVM堆與託管內存的比率taskmanager.memory.managed.fraction。
jobmanager.memory.enable-jvm-direct-memory-limit 是否啟用jm進程的JVM直接內存限制,默認false
jobmanager.memory.flink.size 默認none。這包括JobManager消耗的所有內存。非容器配置
jobmanager.memory.heap.size 默認none。默認為總內存減去JVM,network,managed等
jobmanager.memory.jvm-metaspace.size jm的元空間大小,默認256mb
jobmanager.memory.jvm-overhead.fraction jm保留總進程內存的比例,默認0.1
jobmanager.memory.jvm-overhead.max 最大JVM開銷,默認1gb
jobmanager.memory.jvm-overhead.min 最小JVM開銷,默認192mb
jobmanager.memory.off-heap.size jm的堆外內存,默認128mb,如果第一個參數被啟用,這個將生效
jobmanager.memory.process.size JobManager的總進程內存大小。容器化配置這個,為容器總大小
taskmanager.memory.flink.size TaskExecutor的總Flink內存大小。默認none,非容器配置
taskmanager.memory.framework.heap.size TaskExecutor的框架堆內存大小。默認128mb
taskmanager.memory.framework.off-heap.size TaskExecutor的框架堆外內存大小。默認128mb
taskmanager.memory.jvm-metaspace.size TaskExecutor的JVM元空間大小。默認256mb
taskmanager.memory.jvm-overhead.fraction 要為JVM開銷保留的總進程內存的分數。默認0.1
taskmanager.memory.jvm-overhead.max TaskExecutor的最大JVM開銷最大大小,默認1gb
taskmanager.memory.jvm-overhead.min TaskExecutor的最小JVM開銷大小。默認192mb
taskmanager.memory.managed.consumer-weights 消費者權重。DATAPROC(用於流式RocksDB狀態後端和批量內置演算法)和PYTHON(用於Python進程),默認DATAPROC:70,PYTHON:30
taskmanager.memory.managed.fraction 如果未顯式指定託管內存大小,則將用作託管內存的Flink總內存的分數,默認0.4
taskmanager.memory.managed.size TaskExecutor的託管內存大小。默認none
taskmanager.memory.network.fraction 用作網路內存的總Flink內存的分數,默認0.1
taskmanager.memory.network.max TaskExecutor的最大網路內存大小。默認1gb
taskmanager.memory.network.min TaskExecutor的最小網路內存大小。默認64mb
taskmanager.memory.process.size TaskExecutor的總進程內存大小。默認none,容器配置
taskmanager.memory.task.heap.size tm內存,默認none,
taskmanager.memory.task.off-heap.size tm堆外內存,默認0
5.Miscellaneous Options
fs.allowed-fallback-filesystems none
fs.default-scheme none
io.tmp.dirs 'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html
『叄』 [Flink State] State究竟保存在哪裡
從源碼解析State的保存過程 ,上一篇從task和operator出發說明了保存state的過程,到最後是由運算元調用snapshot方法,進行state的快照操作。那麼state究竟保存在哪裡?
<1> State Backend簡介
關於 Raw Bytes Storage and Backends
總結: StateBackend主要是針對raw bytes storage(即checkpoint),keyed state和operator state來提供功能的,其中checkpoint數據的存儲則是通過CheckpointStreamFactory,而state存儲,針對keyedState是通過AbstractKeyedStateBackend,針對operatorState是通過OperatorStateBackend。
<2>
RocksDBStateBackend的構造函數可以傳入一個AbstractStateBackend,否則默認採用FsStateBackend。
可以看到,從OperatorState的角度來講,目前Flink只有一個實現,即DefaultOperatorStateBackend,它將List風格的State保存在內存中。
從KeyedState的角度來講,目前有兩種實現,HeapKeyedStateBackend將state保存在內存中,而RocksDbKeyedStateBackend將State保存在TM本地的RocksDB中。相對而言,前者在內存中,速度會快,效率高,但一方面會限制state的大小,另一方面也會造成JVM自己的內存問題;後者在本地文件中,就會涉及序列化和反序列化,效率不及前者,但可以保存的state的大小會很大。
從checkpoint和savepoint的角度來看,Memory工廠方法都保存在內存中,顯然不能在生產環境使用,而Fs工廠方法和RocksDb工廠方法,則統一都放在文件系統中,比如HDFS。
從上圖中3,4兩行可以看到,具體用來存儲state的有三種HeapKeyedStateBackend,RocksDBKeyedStateBackend和DefaultOperatorStateBackend。
DefaultOperatorStateBackend
(1)
operator的ListSate的實現類PartitionableListState, OperatorState都保存在內存中,本質上還是一個ArrayList。
(2)snapshot方法
該snapshotStrategy是AbstractSnapshotStrategy<OperatorStateHandle>,而AbstractSnapshotStrategy有三種實現類:
中的snapshot方法:
該snapshot方法中,主要是對 registeredOperatorStates 和 registeredBroadcastStates 的snapshot。
第一步:
針對所有注冊的state進行deepCopy,為了防止在checkpoint的時候數據結構又被修改,deepCopy其實是通過序列化和反序列化的過程;
第二步:
非同步寫入State和MetaInfo,先創建CheckpointStateOutputStream,通過調用factory的方法,這個factory是哪種類型的呢?這個是由定義的狀態後端所決定的。之後會返回相應的OperatorstateHandle用作restore的過程。
第三步:
在StreamTask觸發checkpoint的時候會將一個Task中所有的operator觸發一次snapshot,觸發部分就是上面1,2兩個步驟,其中第二步是會返回一個RunnableFuture,在觸發之後會提交一個AsyncSnapshotCallable非同步任務,會阻塞一直等到checkpoint的Future,其實就是去調用這個方法AbstractAsyncIOCallable, 直到完成之後OperatorState會返回一個OperatorStateHandle,這個地方和後文的keyedState返回的handle不一樣。
『肆』 Flink基礎系列28-Flink容錯機制
在執行流應用程序期間,Flink 會定期保存狀態的一致檢查點
如果發生故障, Flink 將會使用最近的檢查點來一致恢復應用程序的狀態,並重新啟動處理流程
(如圖中所示,7這個數據被source讀到了,准備傳給奇數流時,奇數流宕機了,數據傳輸發生中斷)
遇到故障之後,第一步就是重啟應用
(重啟後,起初流都是空的)
第二步是從 checkpoint 中讀取狀態,將狀態重置
(讀取在遠程倉庫(Storage,這里的倉庫指狀態後端保存數據指定的三種方式之一)保存的狀態)
從檢查點重新啟動應用程序後,其內部狀態與檢查點完成時的狀態完全相同
第三步:開始消費並處理檢查點到發生故障之間的所有數據
這種檢查點的保存和恢復機制可以為應用程序狀態提供「精確一次」(exactly-once)的一致性,因為所有運算元都會保存檢查點並恢復其所有狀態,這樣一來所有的輸入流就都會被重置到檢查點完成時的位置
(這里要求source源也能記錄狀態,回退到讀取數據7的狀態,kafka有相應的偏移指針能完成該操作)
概述
checkpoint和Watermark一樣,都會以廣播的形式告訴所有下游。
具體講解
JobManager 會向每個 source 任務發送一條帶有新檢查點 ID 的消息,通過這種方式來啟動檢查點
(這個帶有新檢查點ID的東西為barrier,由圖中三角型表示,數值2隻是ID)
數據源將它們的狀態寫入檢查點,並發出一個檢查點barrier
狀態後端在狀態存入檢查點之後,會返回通知給source任務,source任務就會向JobManager確認檢查點完成
上圖,在Source端接受到barrier後,將自己此身的3 和 4 的數據的狀態寫入檢查點,且向JobManager發送checkpoint成功的消息,然後向下游分別發出一個檢查點 barrier
可以看出在Source接受barrier時,數據流也在不斷的處理,不會進行中斷
此時的偶數流已經處理完藍2變成了4,但是還沒處理到黃4,只是下游sink發送了一個數據4,而奇數流已經處理完藍3變成了8(黃1+藍1+黃3+藍3),並向下游sink發送了8
此時檢查點barrier都還未到Sum_odd奇數流和Sum_even偶數流
分界線對齊:barrier向下游傳遞,sum任務會等待所有輸入分區的barrier到達
對於barrier已經達到的分區,繼續到達的數據會被緩存
而barrier尚未到達的分區,數據會被正常處理
此時藍色流的barrier先一步抵達了偶數流,黃色的barrier還未到,但是因為數據的不中斷一直處理,此時的先到的藍色的barrier會將此時的偶數流的數據4進行緩存處理,流接著處理接下來的數據等待著黃色的barrier的到來,而黃色barrier之前的數據將會對緩存的數據相加
這次處理的總結:分界線對齊:barrier 向下游傳遞,sum 任務會等待所有輸入分區的 barrier 到達,對於barrier已經到達的分區,繼續到達的數據會被緩存。而barrier尚未到達的分區,數據會被正常處理
當收到所有輸入分區的 barrier 時,任務就將其狀態保存到狀態後端的檢查點中,然後將 barrier 繼續向下游轉發
當藍色的barrier和黃色的barrier(所有分區的)都到達後,進行狀態保存到遠程倉庫,然後對JobManager發送消息,說自己的檢查點保存完畢了
此時的偶數流和奇數流都為8
向下游轉發檢查點 barrier 後,任務繼續正常的數據處理
Sink 任務向 JobManager 確認狀態保存到 checkpoint 完畢
當所有任務都確認已成功將狀態保存到檢查點時,檢查點就真正完成了
CheckPoint為自動保存,SavePoint為手動保存
有狀態的流處理,內部每個運算元任務都可以有自己的狀態
對於流處理器內部來說,所謂的狀態一致性,其實就是我們所說的計算結果要保證准確。
一條數據不應該丟失,也不應該重復計算
在遇到故障時可以恢復狀態,恢復以後的重新計算,結果應該也是完全正確的。
Flink的一個重大價值在於,它既保證了exactly-once,也具有低延遲和高吞吐的處理能力。
Flink使用了一種輕量級快照機制——檢查點(checkpoint)來保證exactly-once語義
有狀態流應用的一致檢查點,其實就是:所有任務的狀態,在某個時間點的一份備份(一份快照)。而這個時間點,應該是所有任務都恰好處理完一個相同的輸入數據的時間。
應用狀態的一致檢查點,是Flink故障恢復機制的核心
端到端(end-to-end)狀態一致性
目前我們看到的一致性保證都是由流處理器實現的,也就是說都是在Flink流處理器內部保證的;而在真實應用中,流處理應用除了流處理器以外還包含了數據源(例如Kafka)和輸出到持久化系統
端到端的一致性保證,意味著結果的正確性貫穿了整個流處理應用的始終;每一個組件都保證了它自己的一致性
整個端到端的一致性級別取決於所有組件中一致性最弱的組件
端到端 exactly-once
冪等寫入
所謂冪等操作,是說一個操作,可以重復執行很多次,但只導致一次結果更改,也就是說,後面再重復執行就不起作用了。
(中間可能會存在不正確的情況,只能保證最後結果正確。比如5=>10=>15=>5=>10=>15,雖然最後是恢復到了15,但是中間有個恢復的過程,如果這個過程能夠被讀取,就會出問題。)
事務寫入
預寫日誌(Write-Ahead-Log,WAL)
把結果數據先當成狀態保存,然後在收到checkpoint完成的通知時,一次性寫入sink系統
簡單易於實現,由於數據提前在狀態後端中做了緩存,所以無論什麼sink系統,都能用這種方式一批搞定
DataStream API提供了一個模版類:GenericWriteAheadSink,來實現這種事務性sink
兩階段提交(Two-Phase-Commit,2PC)
對於每個checkpoint,sink任務會啟動一個事務,並將接下來所有接收到的數據添加到事務里
然後將這些數據寫入外部sink系統,但不提交它們——這時只是"預提交"
這種方式真正實現了exactly-once,它需要一個提供事務支持的外部sink系統。Flink提供了TwoPhaseCommitSinkFunction介面
不同Source和Sink的一致性保證
內部——利用checkpoint機制,把狀態存檔,發生故障的時候可以恢復,保證內部的狀態一致性
source——kafka consumer作為source,可以將偏移量保存下來,如果後續任務出現了故障,恢復的時候可以由連接器重製偏移量,重新消費數據,保證一致性
sink——kafka procer作為sink,採用兩階段提交sink,需要實現一個TwoPhaseCommitSinkFunction
Exactly-once 兩階段提交
JobManager 協調各個 TaskManager 進行 checkpoint 存儲
checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存
當 checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數據流
barrier會在運算元間傳遞下去
每個運算元會對當前的狀態做個快照,保存到狀態後端
checkpoint 機制可以保證內部的狀態一致性
每個內部的 transform 任務遇到 barrier 時,都會把狀態存到 checkpoint 里
sink 任務首先把數據寫入外部 kafka,這些數據都屬於預提交的事務;遇到 barrier 時,把狀態保存到狀態後端,並開啟新的預提交事務
(barrier之前的數據還是在之前的事務中沒關閉事務,遇到barrier後的數據另外新開啟一個事務)
當所有運算元任務的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 checkpoint 完成
sink 任務收到確認通知,正式提交之前的事務,kafka 中未確認數據改為「已確認」
Exactly-once 兩階段提交步驟總結
『伍』 Flink 原理詳解
Flink 是一個流處理框架,支持流處理和批處理,特點是流處理有限,可容錯,可擴展,高吞吐,低延遲。
流處理是處理一條,立馬下一個節點會從緩存中取出,在下一個節點進行計算
批處理是只有處理一批完成後,才會經過網路傳輸到下一個節點
流處理的優點是低延遲 批處理的優點是高吞吐
flink同時支持兩種,flink的網路傳輸是設計固定的緩存塊為單位,用戶可以設置緩存塊的超時值來決定換存塊什麼時候進行傳輸。 數據大於0 進行處理就是流式處理。
如果設置為無限大就是批處理模型。
Flink 集群包括 JobManager 和 TaskManager .
JobManager 主要負責調度 Job 並協調 Task 做 checkpoint,職責上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包 等資源後,會生成優化後的執行計劃,並以 Task 的單元調度到各個 TaskManager 去執行。
TaskManager 在啟動的時候就設置好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為線程。從 JobManager 處接收需要 部署的 Task,部署啟動後,與自己的上游建立 Netty 連接,接收數據並處理。
flink on yarn 是由client 提交 app到 RM 上, 然後RM 分配一個 AppMaster負責運行 Flink JobManager 和 Yarn AppMaster, 然後 AppMaster 分配 容器去運行 Flink TaskManger
SparkStreaming 是將流處理分成微批處理的作業, 最後的處理引擎是spark job
Spark Streaming把實時輸入數據流以時間片Δt (如1秒)為單位切分成塊,Spark Streaming會把每塊數據作為一個RDD,並使用RDD操作處理每一小塊數據。每個塊都會生成一個Spark Job處理,然後分批次提交job到集群中去運行,運行每個 job的過程和真正的spark 任務沒有任何區別。
JobScheler, 負責 Job的調度通過定時器每隔一段時間根據Dstream的依賴關系生一個一個DAG圖
ReceiverTracker負責數據的接收,管理和分配
ReceiverTracker在啟動Receiver的時候他有ReceiverSupervisor,其實現是ReceiverSupervisorImpl, ReceiverSupervisor本身啟 動的時候會啟動Receiver,Receiver不斷的接收數據,通過BlockGenerator將數據轉換成Block。定時器會不斷的把Block數據通會不斷的把Block數據通過BlockManager或者WAL進行存儲,數據存儲之後ReceiverSupervisorlmpl會把存儲後的數據的元數據Metadate匯報給ReceiverTracker,其實是匯報給ReceiverTracker中的RPC實體ReceiverTrackerEndpoin
spark on yarn 的cluster模式, Spark client 向RM提交job請求, RM會分配一個 AppMaster, driver 和 運行在AppMAster節點里, AM然後把Receiver作為一個Task提交給Spark Executor 節點, Receive啟動接受數據,生成數據塊,並通知Spark Appmaster, AM會根據數據塊生成相應的Job, 並把Job 提交給空閑的 Executor 去執行。
1:需要關注流數據是否需要進行狀態管理
2:At-least-once或者Exectly-once消息投遞模式是否有特殊要求
3:對於小型獨立的項目,並且需要低延遲的場景,建議使用storm
4:如果你的項目已經使用了spark,並且秒級別的實時處理可以滿足需求的話,建議使用sparkStreaming
5:要求消息投遞語義為 Exactly Once 的場景;數據量較大,要求高吞吐低延遲的場景;需要進行狀態管理或窗口統計的場景,建議使用flink
Flink 提供的Api右 DataStream 和 DataSet ,他們都是不可變的數據集合,不可以增加刪除中的元素, 通過 Source 創建 DataStream 和 DataSet
在創建運行時有:
Flink的每一個Operator稱為一個任務, Operator 的每一個實例稱為子任務,每一個任務在JVM線程中執行。可以將多個子任務鏈接成一個任務,減少上下文切換的開銷,降低延遲。
source 和 運算元map 如果是 one by one 的關系,他們的數據交換可以通過緩存而不是網路通信
TaskManager 為控制執行任務的數量,將計算資源劃分多個slot,每個slot獨享計算資源,這種靜態分配利於任務資源隔離。
同一個任務可以共享一個slot, 不同作業不可以。
這里因為 Source 和 Map 並行度都是4 採用直連方式,他們的數據通信採用緩存形式
所以一共需要兩個TaskManager source,Map 一個,rece一個, 每個TaskManager 要3個slot
JobManager 將 JobGraph 部署 ExecutionGraph
設置的並行度,可以讓一個ExecJobVertex 對應 多個並行的ExecVertex 實例。
Flink通過狀態機管理 ExecGraph的作業執行進度。
Flink 將對象序列化為固定數量的預先分配的內存段,而不是直接把對象放在堆內存上。
Flink TaskManager 是由幾個內部組件組成的:actor 系統(負責與 Flink master 協調)、IOManager(負責將數據溢出到磁碟並將其讀取回來)、MemoryManager(負責協調內存使用。
數據源:
Sink:
時間:
處理時間:取自Operator的機器系統時間
事件時間: 由數據源產生
進入時間: 被Source節點觀察時的系統時間
如果數據源沒有自己正確創建水印,程序必須自己生成水印來確保基於事件的時間窗口可以正常工作。。
DataStream 提供了 周期性水印,間歇式水印,和遞增式水印
『陸』 Flink:特性、概念、組件棧、架構及原理分析
簡單之美 | Apache Flink:特性、概念、組件棧、架構及原理分析
http://shiyanjun.cn/archives/1508.html
Apache Flink是一個面向分布式數據流處理和批量數據處理的開源計算平台,它能夠基於同一個Flink運行時(Flink Runtime),提供支持流處理和批處理兩種類型應用的功能。現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為他們它們所提供的SLA是完全不相同的:流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理,所以在實現的時候通常是分別給出兩套實現方法,或者通過一個獨立的開源框架來實現其中每一種處理方案。例如,實現批處理的開源方案有MapRece、Tez、Crunch、Spark,實現流處理的開源方案有Samza、Storm。Flink在實現流處理和批處理時,與傳統的一些方案完全不同,它從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數據流是無界的;批處理被作為一種特殊的流處理,只是它的輸入數據流被定義為有界的。基於同一個Flink運行時(Flink Runtime),分別提供了流處理和批處理API,而這兩種API也是實現上層面向流處理、批處理類型應用框架的基礎。
基本特性
關於Flink所支持的特性,我這里只是通過分類的方式簡單做一下梳理,涉及到具體的一些概念及其原理會在後面的部分做詳細說明。
流處理特性
支持高吞吐、低延遲、高性能的流處理
支持帶有事件時間的窗口(Window)操作
支持有狀態計算的Exactly-once語義
支持高度靈活的窗口(Window)操作,支持基於time、count、session,以及data-driven的窗口操作
支持具有Backpressure功能的持續流模型
支持基於輕量級分布式快照(Snapshot)實現的容錯
一個運行時同時支持Batch on Streaming處理和Streaming處理
Flink在JVM內部實現了自己的內存管理
支持迭代計算
支持程序自動優化:避免特定情況下Shuffle、排序等昂貴操作,中間結果有必要進行緩存
API支持
對Streaming數據類應用,提供DataStream API
對批處理類應用,提供DataSet API(支持Java/Scala)
Libraries支持
支持機器學習(FlinkML)
支持圖分析(Gelly)
支持關系數據處理(Table)
支持復雜事件處理(CEP)
整合支持
支持Flink on YARN
支持HDFS
支持來自Kafka的輸入數據
支持Apache HBase
支持Hadoop程序
支持Tachyon
支持ElasticSearch
支持RabbitMQ
支持Apache Storm
支持S3
支持XtreemFS
基本概念
Stream & Transformation & Operator
用戶實現的Flink程序是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一個中間結果數據,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。當一個Flink程序被執行的時候,它會被映射為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。下面是一個由Flink程序映射為Streaming Dataflow的示意圖,如下所示:
比如從Source[1]到map()[1],它保持了Source的分區特性(Partitioning)和分區內元素處理的有序性,也就是說map()[1]的Subtask看到數據流中記錄的順序,與Source[1]中看到的記錄順序是一致的。
Redistribution模式
這種模式改變了輸入數據流的分區,比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多個不同的Subtask發送數據,改變了數據流的分區,這與實際應用所選擇的Operator有關系。另外,Source Operator對應2個Subtask,所以並行度為2,而Sink Operator的Subtask只有1個,故而並行度為1。
Task & Operator Chain
在Flink分布式執行環境中,會將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的線程中執行,如下圖所示:
在Flink集群啟動的時候,TaskManager會向JobManager注冊,如果注冊成功,則JobManager會向TaskManager回復消息AcknowledgeRegistration。
SubmitJob
Flink程序內部通過Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。
CancelJob
請求取消一個Flink Job的執行,CancelJob消息中包含了Job的ID,如果成功則返回消息CancellationSuccess,失敗則返回消息CancellationFailure。
UpdateTaskExecutionState
TaskManager會向JobManager請求更新ExecutionGraph中的ExecutionVertex的狀態信息,更新成功則返回true。
RequestNextInputSplit
運行在TaskManager上面的Task,請求獲取下一個要處理的輸入Split,成功則返回NextInputSplit。
JobStatusChanged
ExecutionGraph向JobManager發送該消息,用來表示Flink Job的狀態發生的變化,例如:RUNNING、CANCELING、FINISHED等。
TaskManager
TaskManager也是一個Actor,它是實際負責執行計算的Worker,在其上執行Flink Job的一組Task。每個TaskManager負責管理其所在節點上的資源信息,如內存、磁碟、網路,在啟動的時候將資源的狀態向JobManager匯報。TaskManager端可以分成兩個階段:
注冊階段
TaskManager會向JobManager注冊,發送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然後TaskManager就可以進行初始化過程。
可操作階段
該階段TaskManager可以接收並處理與Task有關的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager無法連接到JobManager,這是TaskManager就失去了與JobManager的聯系,會自動進入「注冊階段」,只有完成注冊才能繼續處理Task相關的消息。
Client
當用戶提交一個Flink程序時,會首先創建一個Client,該Client首先會對用戶提交的Flink程序進行預處理,並提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,並建立到JobManager的連接,將Flink Job提交給JobManager。Client會將用戶提交的Flink程序組裝一個JobGraph, 並且是以JobGraph的形式提交的。一個JobGraph是一個Flink Dataflow,它由多個JobVertex組成的DAG。其中,一個JobGraph包含了一個Flink程序的如下信息:JobID、Job名稱、配置信息、一組JobVertex等。
組件棧
Flink是一個分層架構的系統,每一層所包含的組件都提供了特定的抽象,用來服務於上層組件。Flink分層的組件棧如下圖所示:
了解YARN的話,對上圖的原理非常熟悉,實際Flink也實現了滿足在YARN集群上運行的各個組件:Flink YARN Client負責與YARN RM通信協商資源請求,Flink JobManager和Flink TaskManager分別申請到Container去運行各自的進程。通過上圖可以看到,YARN AM與Flink JobManager在同一個Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請Container去啟動Flink TaskManager。待Flink成功運行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,並進行後續的映射、調度和計算處理。
Runtime層
Runtime層提供了支持Flink計算的全部核心實現,比如:支持分布式Stream處理、JobGraph到ExecutionGraph的映射、調度等等,為上層API層提供基礎服務。
API層
API層主要實現了面向無界Stream的流處理和面向Batch的批處理API,其中面向流處理對應DataStream API,面向批處理對應DataSet API。
Libraries層
該層也可以稱為Flink應用框架層,根據API層的劃分,在API層之上構建的滿足特定應用的實現計算框架,也分別對應於面向流處理和面向批處理兩類。面向流處理支持:CEP(復雜事件處理)、基於sql-like的操作(基於Table的關系操作);面向批處理支持:FlinkML(機器學習庫)、Gelly(圖處理)。
內部原理
容錯機制
Flink基於Checkpoint機制實現容錯,它的原理是不斷地生成分布式Streaming數據流Snapshot。在流處理失敗時,通過這些Snapshot可以恢復數據流處理。理解Flink的容錯機制,首先需要了解一下Barrier這個概念:Stream Barrier是Flink分布式Snapshotting中的核心元素,它會作為數據流的記錄被同等看待,被插入到數據流中,將數據流中記錄的進行分組,並沿著數據流的方向向前推進。每個Barrier會攜帶一個Snapshot ID,屬於該Snapshot的記錄會被推向該Barrier的前方。因為Barrier非常輕量,所以並不會中斷數據流。帶有Barrier的數據流,如下圖所示:
接收到Barrier n的Stream被臨時擱置,來自這些Stream的記錄不會被處理,而是被放在一個Buffer中
一旦最後一個Stream接收到Barrier n,Operator會emit所有暫存在Buffer中的記錄,然後向Checkpoint Coordinator發送Snapshot n
繼續處理來自多個Stream的記錄
基於Stream Aligning操作能夠實現Exactly Once語義,但是也會給流處理應用帶來延遲,因為為了排列對齊Barrier,會暫時緩存一部分Stream的記錄到Buffer中,尤其是在數據流並行度很高的場景下可能更加明顯,通常以最遲對齊Barrier的一個Stream為處理Buffer中緩存記錄的時刻點。在Flink中,提供了一個開關,選擇是否使用Stream Aligning,如果關掉則Exactly Once會變成At least once。
調度機制
在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將一個JobGraph轉換映射為一個ExecutionGraph,如下圖所示:
迭代機制
機器學習和圖計算應用,都會使用到迭代計算,Flink通過在迭代Operator中定義Step函數來實現迭代演算法,這種迭代演算法包括Iterate和Delta Iterate兩種類型,在實現上它們反復地在當前迭代狀態上調用Step函數,直到滿足給定的條件才會停止迭代。下面,對Iterate和Delta Iterate兩種類型的迭代演算法原理進行說明:
Iterate
Iterate Operator是一種簡單的迭代形式:每一輪迭代,Step函數的輸入或者是輸入的整個數據集,或者是上一輪迭代的結果,通過該輪迭代計算出下一輪計算所需要的輸入(也稱為Next Partial Solution),滿足迭代的終止條件後,會輸出最終迭代結果,具體執行流程如下圖所示:
Delta Iterate Operator實現了增量迭代,它的實現原理如下圖所示:
另外,Flink還提供了3個參數來配置Backpressure監控行為:
參數名稱
默認值
說明
jobmanager.web.backpressure.refresh-interval
60000
默認1分鍾,表示采樣統計結果刷新時間間隔
jobmanager.web.backpressure.num-samples
100
評估Backpressure狀態,所使用的堆棧跟蹤調用次數
jobmanager.web.backpressure.delay-between-samples
50
默認50毫秒,表示對一個Job的每個Task依次調用的時間間隔
通過上面個定義的Backpressure狀態,以及調整相應的參數,可以確定當前運行的Job的狀態是否正常,並且保證不影響JobManager提供服務。
參考鏈接
http://flink.apache.org/
http://flink.apache.org/features.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/general_arch.html
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheling.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/libs/cep.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/gelly.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/ml/index.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/table.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
http://geek.csdn.net/news/detail/56272
http://samza.apache.org/
『柒』 Flink內存管理
java所有數據類型對應的位元組大小
java對象的組成 : 對象頭,實例數據,對齊部分
jvm 序列化缺點
上面圖為TaskManager內存模型,左邊為細分的內存模型,右邊為整體內存模型,該圖摘自Flink官網
heap內存在jvm啟動的時候申請的一塊不變的內存區域,該內存實際上是Flink和task公用的一塊區域,在flink層面通過控制來區分框架使用和task內存,heap內存管理起來是比較容易的,實際上non-heap的內存是難管理的一塊,如果管理不當或者使用不當可能造成內存泄漏或者內存無限增長等問題
內存參數配置
在flink中對內存進行了抽象成了MemorySegment,�默認情況下,一個 MemorySegment 對應著一個 32KB 大小的內存塊,這塊內存既可以是堆上內存( byte數組) ,也可以是堆外內存(nio的ByteBufferr ) .
同時MemorySegment也提供了對二進制數據的操作方法,以及讀取位元組數組序列化以及序列化位元組數組的方法等
下面是類繼承圖,該類有兩MemorySegment實現類有兩個分別為使用heap的以及混合的即有heap和non-heap,對於內存的訪問有子類具體的實現
MemorySemgent是flink內存分配的最小單元了,對於數據誇MemorySemgent保存,那麼對於上層的使用者來說,需要考慮考慮所有的細節,由於過於繁瑣,所以在MemorySemgent上又抽象了一層內存也,內存也是在MemorySemgent數據訪問上的視圖,對數據輸入和輸出分別抽象為DataInputView/DataOutputView,有了這一層,上層使用者無需關心跨MemorySemgent的細節問題,內存也對自動處理跨MemorySemgent的內存操作
DataInputView
DataInputView繼承DataInput,DataInputView是對MemorySemgent讀取的抽象視圖,提供一系列讀取二進制數據不同類型的方法,AbstractPageInputView是DataInputView的一個抽象實現類,並且基本所有InputView都實現了該類,即所有實現該類的InputView都支持Page
InputView持有了多個MemorySemgent的引用(可以基於數組,list,deque等),這些MemorySemgent被視為一個內存頁,可以順序,隨機等方式讀取數據,要基於不同的實現類,實現類不同讀取方式不同
方法圖
DataOutputView
與DataInputView相對應,繼承Output,並有一個擁有Page功能的抽象類(AbstractPagedOutputView),其大部outputView的實現都是繼承自該抽象類,對一組MemorySemgent提供一個基於頁的寫入功能
方法圖
類繼承圖
用於網路io數據的包裝,每個buffer持有一個MemorySegment的引用,resultPartition寫數據的時候,會向LocalBufferPool申請Buffer,會返回BufferBuilder,通過BufferBuilder想Buffe r<實際寫入的是MemorySegment> 寫數據
BufferBuilder是在上游Task中,負責想Buffer寫入數據,BufferConsumer位於下游,與BufferBuilder相對應,用於消費Buffer的數據,每個bufferBuilder對應一個bufferConsumer
常用參數介紹
buffer申請
buffer回收
當buffer用完之後需要進行回收比如在netty的clientHandler收到響應之後進行處理就會把buffer回收掉,buffer回收之後並不會釋放memorySegment,而是放回池中,變為可用內存,反復使用
flink託管的內存,託管內存使用堆外內存,用於批處理緩存排序等以及提供rocksDB內存
NetworkBufferPool是一個固定大小的MemorySegment實例吃,用於網路棧中,NettyBufferPool會為每個ResultPartition創建屬於自己的LocalBufferPool,NettyBufferPool會作為全局的pool來提供內存,LocalBufferPool會通過限制來控制自己內存的申請,防止過多申請
LocalBufferPool繼承關系,實現了bufferRecycler的介面,用於回收自己持有的buffer
在數據接收的時候會將數據封裝成NettyBuffer,在數據發送的時候會通過BufferBilder向MemorySegment寫入數據,然後通過BufferConsumer讀取MemorySegment的數據
BufferManager主要用於為RemoteInputChannel提供buffer的,bufferManager在啟動的時候會向全局bufferPool請求自己的獨有buffer,當bufferManager的buffer不夠的時候,則會向localBufferPool請求buffer,此時請求的buffer為浮動buffer
實際上提供的buffer是
『捌』 flink 窗口數據存儲在哪裡
窗口數據信息一般會存儲在C盤。
因為這樣的數據信息屬於運行緩存數據信息,它會在C盤進行運行的存儲應用。
『玖』 基於flink sql構建實時數據倉庫
根據目前大數據這一塊的發展,已經不局限於離線的分析,挖掘數據潛在的價值,數據的時效性最近幾年變得剛需,實時處理的框架有storm,spark-streaming,flink等。想要做到實時數據這個方案可行,需要考慮以下幾點:1、狀態機制 2、精確一次語義 3、高吞吐量 4、可彈性伸縮的應用 5、容錯機制,剛好這幾點,flink都完美的實現了,並且支持flink sql高級API,減少了開發成本,可用實現快速迭代,易維護等優點。
離線數倉的架構圖:
實時數倉架構圖:
目前是將實時維度表和DM層數據存於hbase當中,實時公共層都存於kafka當中,並且以寫滾動日誌的方式寫入HDFS(主要是用於校驗數據)。其實在這里可以做的工作還有很多,kafka集群,flink集群,hbase集群相互獨立,這對整個實時數據倉庫的穩定性帶來一定的挑戰。
一個數據倉庫想要成體系,成資產,離不開數據域的劃分。所以參考著離線的數據倉庫,想著在實時數倉做出這方面的探索,理論上來講,離線可以實現的,實時也是可以實現的。 並且目前已經取得了成效,目前劃分的數據域跟離線大致相同,有流量域,交易域,營銷域等等。當然這裡面涉及到維表,多事務事實表,累計快照表,周期性快照表的設計,開發,到落地這里就不詳述了。
維度表也是整個實時數據倉庫不可或缺的部分。從目前整個實時數倉的建設來看,維度表有著數據量大,但是變更少的特點,我們試想過構建全平台的實時商品維度表或者是實時會員維度表,但是這類維度表太過於復雜,所以針對這類維度表下面介紹。還有另外一種就是較為簡單的維度表,這類維度可能對應著業務系統單個mysql表,或者只需要幾個表進行簡單ETL就可以產出的表,這類維表是可以做成實時的。以下有幾個實施的關鍵點:
如下是離線數據同步架構圖:
實時數據的接入其實在底層架構是一樣的,就是從kafka那邊開始不一樣,實時用flink的UDTF進行解析,而離線是定時(目前是小時級)用camus拉到HDFS,然後定時load HDFS的數據到hive表裡面去,這樣來實現離線數據的接入。實時數據的接入是用flink解析kafka的數據,然後在次寫入kafka當中去。
由於目前離線數據已經穩定運行了很久,所以實時接入數據的校驗可以對比離線數據,但是離線數據是小時級的hive數據,實時數據存於kafka當中,直接比較不了,所以做了相關處理,將kafka的數據使用flink寫HDFS滾動日誌的形式寫入HDFS,然後建立hive表小時級定時去load HDFS中的文件,以此來獲取實時數據。
完成以上兩點,剩餘還需要考慮一點,都是小時級的任務,這個時間卡點使用什麼欄位呢?首先要確定一點就是離線和實時任務卡點的時間欄位必須是一致的,不然肯定會出問題。目前離線使用camus從kafka將數據拉到HDFS上,小時級任務,使用nginx_ts這個時間欄位來卡點,這個欄位是上報到nginx伺服器上記錄的時間點。而實時的數據接入是使用flink消費kafka的數據,在以滾動日誌的形式寫入HDFS的,然後在建立hive表load HDFS文件獲取數據,雖然這個hive也是天/小時二級分區,但是離線的表是根據nginx_ts來卡點分區,但是實時的hive表是根據任務啟動去load文件的時間點去區分的分區,這是有區別的,直接篩選分區和離線的數據進行對比,會存在部分差異,應當的做法是篩選范圍分區,然後在篩選nginx_ts的區間,這樣在跟離線做對比才是合理的。
目前實時數據接入層的主要時延是在UDTF函數解析上,實時的UDTF函數是根據上報的日誌格式進行開發的,可以完成日誌的解析功能。
解析流程圖如下:
解析速率圖如下:
該圖還不是在峰值數據量的時候截的,目前以800記錄/second為准,大概一個記錄的解析速率為1.25ms。
目前該任務的flink資源配置核心數為1,假設解析速率為1.25ms一條記錄,那麼峰值只能處理800條/second,如果數據接入速率超過該值就需要增加核心數,保證解析速率。
介紹一下目前離線維度表的情況,就拿商品維度表來說,全線記錄數將近一個億,計算邏輯來自40-50個ods層的數據表,計算邏輯相當復雜,如果實時維度表也參考離線維度表來完成的話,那麼開發成本和維護成本非常大,對於技術來講也是很大的一個挑戰,並且目前也沒有需求要求維度屬性百分百准確。所以目前(偽實時維度表)准備在當天24點產出,當天的維度表給第二天實時公共層使用,即T-1的模式。偽實時維度表的計算邏輯參考離線維度表,但是為了保障在24點之前產出,需要簡化一下離線計算邏輯,並且去除一些不常用的欄位,保障偽實時維度表可以較快產出。
實時維度表的計算流程圖:
目前使用flink作為公司主流的實時計算引擎,使用內存作為狀態後端,並且固定30s的間隔做checkpoint,使用HDFS作為checkpoint的存儲組件。並且checkpoint也是作為任務restart以後恢復狀態的重要依據。熟悉flink的人應該曉得,使用內存作為狀態後端,這個內存是JVM的堆內存,畢竟是有限的東西,使用不得當,OOM是常有的事情,下面就介紹一下針對有限的內存,如果完成常規的計算。
『拾』 轉載:阿里巴巴為什麼選擇Apache Flink
本文主要整理自阿里巴巴計算平台事業部資深技術專家莫問在雲棲大會的演講。
合抱之木,生於毫末
隨著人工智慧時代的降臨,數據量的爆發,在典型的大數據的業務場景下數據業務最通用的做法是:選用批處理的技術處理全量數據,採用流式計算處理實時增量數據。在絕大多數的業務場景之下,用戶的業務邏輯在批處理和流處理之中往往是相同的。但是,用戶用於批處理和流處理的兩套計算引擎是不同的。
因此,用戶通常需要寫兩套代碼。毫無疑問,這帶來了一些額外的負擔和成本。阿里巴巴的商品數據處理就經常需要面對增量和全量兩套不同的業務流程問題,所以阿里就在想,我們能不能有一套統一的大數據引擎技術,用戶只需要根據自己的業務邏輯開發一套代碼。這樣在各種不同的場景下,不管是全量數據還是增量數據,亦或者實時處理,一套方案即可全部支持, 這就是阿里選擇Flink的背景和初衷 。
目前開源大數據計算引擎有很多選擇,流計算如Storm,Samza,Flink,Kafka Stream等,批處理如Spark,Hive,Pig,Flink等。而同時支持流處理和批處理的計算引擎,只有兩種選擇:一個是Apache Spark,一個是Apache Flink。
從技術,生態等各方面的綜合考慮。首先,Spark的技術理念是基於批來模擬流的計算。而Flink則完全相反,它採用的是基於流計算來模擬批計算。
從技術發展方向看,用批來模擬流有一定的技術局限性,並且這個局限性可能很難突破。而Flink基於流來模擬批,在技術上有更好的擴展性。從長遠來看,阿里決定用Flink做一個統一的、通用的大數據引擎作為未來的選型。
Flink是一個低延遲、高吞吐、統一的大數據計算引擎。在阿里巴巴的生產環境中,Flink的計算平台可以實現毫秒級的延遲情況下,每秒鍾處理上億次的消息或者事件。同時Flink提供了一個Exactly-once的一致性語義。保證了數據的正確性。這樣就使得Flink大數據引擎可以提供金融級的數據處理能力。
Flink在阿里的現狀
基於Apache Flink在阿里巴巴搭建的平台於2016年正式上線,並從阿里巴巴的搜索和推薦這兩大場景開始實現。目前阿里巴巴所有的業務,包括阿里巴巴所有子公司都採用了基於Flink搭建的實時計算平台。同時Flink計算平台運行在開源的Hadoop集群之上。採用Hadoop的YARN做為資源管理調度,以 HDFS作為數據存儲。因此,Flink可以和開源大數據軟體Hadoop無縫對接。
目前,這套基於Flink搭建的實時計算平台不僅服務於阿里巴巴集團內部,而且通過阿里雲的雲產品API向整個開發者生態提供基於Flink的雲產品支持。
Flink在阿里巴巴的大規模應用,表現如何?
規模: 一個系統是否成熟,規模是重要指標,Flink最初上線阿里巴巴只有數百台伺服器,目前規模已達上萬台,此等規模在全球范圍內也是屈指可數;
狀態數據: 基於Flink,內部積累起來的狀態數據已經是PB級別規模;
Events: 如今每天在Flink的計算平台上,處理的數據已經超過萬億條;
PS: 在峰值期間可以承擔每秒超過4.72億次的訪問,最典型的應用場景是阿里巴巴雙11大屏;
Flink的發展之路
接下來從開源技術的角度,來談一談Apache Flink是如何誕生的,它是如何成長的?以及在成長的這個關鍵的時間點阿里是如何進入的?並對它做出了那些貢獻和支持?
Flink誕生於歐洲的一個大數據研究項目StratoSphere。該項目是柏林工業大學的一個研究性項目。早期,Flink是做Batch計算的,但是在2014年,StratoSphere裡面的核心成員孵化出Flink,同年將Flink捐贈Apache,並在後來成為Apache的頂級大數據項目,同時Flink計算的主流方向被定位為Streaming,即用流式計算來做所有大數據的計算,這就是Flink技術誕生的背景。
2014年Flink作為主攻流計算的大數據引擎開始在開源大數據行業內嶄露頭角。區別於Storm,Spark Streaming以及其他流式計算引擎的是:它不僅是一個高吞吐、低延遲的計算引擎,同時還提供很多高級的功能。比如它提供了有狀態的計算,支持狀態管理,支持強一致性的數據語義以及支持Event Time,WaterMark對消息亂序的處理。
Flink核心概念以及基本理念
Flink最區別於其他流計算引擎的,其實就是狀態管理。
什麼是狀態?例如開發一套流計算的系統或者任務做數據處理,可能經常要對數據進行統計,如Sum,Count,Min,Max,這些值是需要存儲的。因為要不斷更新,這些值或者變數就可以理解為一種狀態。如果數據源是在讀取Kafka,RocketMQ,可能要記錄讀取到什麼位置,並記錄Offset,這些Offset變數都是要計算的狀態。
Flink提供了內置的狀態管理,可以把這些狀態存儲在Flink內部,而不需要把它存儲在外部系統。這樣做的好處是第一降低了計算引擎對外部系統的依賴以及部署,使運維更加簡單;第二,對性能帶來了極大的提升:如果通過外部去訪問,如Redis,HBase它一定是通過網路及RPC。如果通過Flink內部去訪問,它只通過自身的進程去訪問這些變數。同時Flink會定期將這些狀態做Checkpoint持久化,把Checkpoint存儲到一個分布式的持久化系統中,比如HDFS。這樣的話,當Flink的任務出現任何故障時,它都會從最近的一次Checkpoint將整個流的狀態進行恢復,然後繼續運行它的流處理。對用戶沒有任何數據上的影響。
Flink是如何做到在Checkpoint恢復過程中沒有任何數據的丟失和數據的冗餘?來保證精準計算的?
這其中原因是Flink利用了一套非常經典的Chandy-Lamport演算法,它的核心思想是把這個流計算看成一個流式的拓撲,定期從這個拓撲的頭部Source點開始插入特殊的Barries,從上游開始不斷的向下游廣播這個Barries。每一個節點收到所有的Barries,會將State做一次Snapshot,當每個節點都做完Snapshot之後,整個拓撲就算完整的做完了一次Checkpoint。接下來不管出現任何故障,都會從最近的Checkpoint進行恢復。
Flink利用這套經典的演算法,保證了強一致性的語義。這也是Flink與其他無狀態流計算引擎的核心區別。
下面介紹Flink是如何解決亂序問題的。比如星球大戰的播放順序,如果按照上映的時間觀看,可能會發現故事在跳躍。
在流計算中,與這個例子是非常類似的。所有消息到來的時間,和它真正發生在源頭,在線系統Log當中的時間是不一致的。在流處理當中,希望是按消息真正發生在源頭的順序進行處理,不希望是真正到達程序里的時間來處理。Flink提供了Event Time和WaterMark的一些先進技術來解決亂序的問題。使得用戶可以有序的處理這個消息。這是Flink一個很重要的特點。
接下來要介紹的是Flink啟動時的核心理念和核心概念,這是Flink發展的第一個階段;第二個階段時間是2015年和2017年,這個階段也是Flink發展以及阿里巴巴介入的時間。故事源於2015年年中,我們在搜索事業部的一次調研。當時阿里有自己的批處理技術和流計算技術,有自研的,也有開源的。但是,為了思考下一代大數據引擎的方向以及未來趨勢,我們做了很多新技術的調研。
結合大量調研結果,我們最後得出的結論是:解決通用大數據計算需求,批流融合的計算引擎,才是大數據技術的發展方向,並且最終我們選擇了Flink。
但2015年的Flink還不夠成熟,不管是規模還是穩定性尚未經歷實踐。最後我們決定在阿里內部建立一個Flink分支,對Flink做大量的修改和完善,讓其適應阿里巴巴這種超大規模的業務場景。在這個過程當中,我們團隊不僅對Flink在性能和穩定性上做出了很多改進和優化,同時在核心架構和功能上也進行了大量創新和改進,並將其貢獻給社區,例如:Flink新的分布式架構,增量Checkpoint機制,基於Credit-based的網路流控機制和Streaming SQL等。
阿里巴巴對Flink社區的貢獻
我們舉兩個設計案例,第一個是阿里巴巴重構了Flink的分布式架構,將Flink的Job調度和資源管理做了一個清晰的分層和解耦。這樣做的首要好處是Flink可以原生的跑在各種不同的開源資源管理器上。經過這套分布式架構的改進,Flink可以原生地跑在Hadoop Yarn和Kubernetes這兩個最常見的資源管理系統之上。同時將Flink的任務調度從集中式調度改為了分布式調度,這樣Flink就可以支持更大規模的集群,以及得到更好的資源隔離。
另一個是實現了增量的Checkpoint機制,因為Flink提供了有狀態的計算和定期的Checkpoint機制,如果內部的數據越來越多,不停地做Checkpoint,Checkpoint會越來越大,最後可能導致做不出來。提供了增量的Checkpoint後,Flink會自動地發現哪些數據是增量變化,哪些數據是被修改了。同時只將這些修改的數據進行持久化。這樣Checkpoint不會隨著時間的運行而越來越難做,整個系統的性能會非常地平穩,這也是我們貢獻給社區的一個很重大的特性。
經過2015年到2017年對Flink Streaming的能力完善,Flink社區也逐漸成熟起來。Flink也成為在Streaming領域最主流的計算引擎。因為Flink最早期想做一個流批統一的大數據引擎,2018年已經啟動這項工作,為了實現這個目標,阿里巴巴提出了新的統一API架構,統一SQL解決方案,同時流計算的各種功能得到完善後,我們認為批計算也需要各種各樣的完善。無論在任務調度層,還是在數據Shuffle層,在容錯性,易用性上,都需要完善很多工作。
篇幅原因,下面主要和大家分享兩點:
● 統一 API Stack
● 統一 SQL方案
先來看下目前Flink API Stack的一個現狀,調研過Flink或者使用過Flink的開發者應該知道。Flink有2套基礎的API,一套是DataStream,一套是DataSet。DataStream API是針對流式處理的用戶提供,DataSet API是針對批處理用戶提供,但是這兩套API的執行路徑是完全不一樣的,甚至需要生成不同的Task去執行。所以這跟得到統一的API是有沖突的,而且這個也是不完善的,不是最終的解法。在Runtime之上首先是要有一個批流統一融合的基礎API層,我們希望可以統一API層。
因此,我們在新架構中將採用一個DAG(有限無環圖)API,作為一個批流統一的API層。對於這個有限無環圖,批計算和流計算不需要涇渭分明的表達出來。只需要讓開發者在不同的節點,不同的邊上定義不同的屬性,來規劃數據是流屬性還是批屬性。整個拓撲是可以融合批流統一的語義表達,整個計算無需區分是流計算還是批計算,只需要表達自己的需求。有了這套API後,Flink的API Stack將得到統一。
除了統一的基礎API層和統一的API Stack外,同樣在上層統一SQL的解決方案。流和批的SQL,可以認為流計算有數據源,批計算也有數據源,我們可以將這兩種源都模擬成數據表。可以認為流數據的數據源是一張不斷更新的數據表,對於批處理的數據源可以認為是一張相對靜止的表,沒有更新的數據表。整個數據處理可以當做SQL的一個Query,最終產生的結果也可以模擬成一個結果表。
對於流計算而言,它的結果表是一張不斷更新的結果表。對於批處理而言,它的結果表是相當於一次更新完成的結果表。從整個SOL語義上表達,流和批是可以統一的。此外,不管是流式SQL,還是批處理SQL,都可以用同一個Query來表達復用。這樣以來流批都可以用同一個Query優化或者解析。甚至很多流和批的運算元都是可以復用的。
Flink的未來方向
首先,阿里巴巴還是要立足於Flink的本質,去做一個全能的統一大數據計算引擎。將它在生態和場景上進行落地。目前Flink已經是一個主流的流計算引擎,很多互聯網公司已經達成了共識:Flink是大數據的未來,是最好的流計算引擎。下一步很重要的工作是讓Flink在批計算上有所突破。在更多的場景下落地,成為一種主流的批計算引擎。然後進一步在流和批之間進行無縫的切換,流和批的界限越來越模糊。用Flink,在一個計算中,既可以有流計算,又可以有批計算。
第二個方向就是Flink的生態上有更多語言的支持,不僅僅是Java,Scala語言,甚至是機器學習下用的Python,Go語言。未來我們希望能用更多豐富的語言來開發Flink計算的任務,來描述計算邏輯,並和更多的生態進行對接。
最後不得不說AI,因為現在很多大數據計算的需求和數據量都是在支持很火爆的AI場景,所以在Flink流批生態完善的基礎上,將繼續往上走,完善上層Flink的Machine Learning演算法庫,同時Flink往上層也會向成熟的機器學習,深度學習去集成。比如可以做Tensorflow On Flink, 讓大數據的ETL數據處理和機器學習的Feature計算和特徵計算,訓練的計算等進行集成,讓開發者能夠同時享受到多種生態給大家帶來的好處。