1. Flink 指標(二)
通過 conf/flink-conf.yaml 文件配置一個或多個 Reporters 來暴露度量值給外部系統,這些 Reporter 將在作業和任務啟動的時候實例化。
所有的 Reporter 配置至少需要配置 class 屬性,還有一些允許配置記錄間隔。下面是一些 Reporter 的配置實例:
包含 Reporter 的 jar 必須放到 /lib 文件夾,這樣 Flink 就可以訪問到這些 jar。
可以通過繼承 org.apache.flink.metrics.reporter.MetricReporter 介面來實現自己的 Reporter,如果需要定期發送記錄,需要繼承 Scheled 介面。
下面是一些支持的 Reporter:
不需要添加額外的依賴就可以支持 JMX Reporter,默認是不激活的。
參數:
配置示例:
通過 JMX 公開的度量由域(domain)和鍵屬性列表(key-properties)標識,這些屬性一起構成對象名。
域始終以 org.apache.flink 開頭,後跟一個通用的度量標識符。與通常的標識符不同,它不受作用域格式的影響,不包含任何變數,並且在跨作業時也是常量。例子: org.apache.flink.job.task.numbytesout 。
鍵屬性列表包含與給定指標關聯的所有變數的值,無論配置的作用域格式如何。例子: host=localhost,job_name=myjob,task_name=mytask 。
因此,域標識一個度量類,鍵屬性列表標識該度量的一個(或多個)實例。
要使用此 Reporter,必須復制 /opt/flink-metrics-ganglia-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:
配置示例:
要使用此 Reporter,必須復制 /opt/flink-metrics-graphite-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:
配置示例:
要使用此 Reporter,必須復制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:
配置示例:
Flink 度量類型映射到 Prometheus 度量類型,如下所示:
要使用此 Reporter,必須復制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:
配置示例:
PrometheusPushGatewayReporter 將指標推送到 Pushgateway,可由 Prometheus 抓取。
要使用此 Reporter,必須復制 /opt/flink-metrics-statsd-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:
配置示例:
要使用此 Reporter,必須復制 /opt/flink-metrics-datadog-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
Flink 指標,如任何變數 <host> , <job_name> , <tm_id> , <subtask_index> , <task_name> 和 <operator_name> ,將被發送到 Datadog 作為標簽。標簽看起來像 host:localhost 和 job_name:myjobname 。
參數:
配置示例:
要使用此 Reporter,必須復制 /opt/flink-metrics-slf4j-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
配置示例:
Flink 默認會收集當前狀態的指標,下文的表格中包括以下5列:
請注意,「infix」 和 「Metrics」 列中所有的點根據 「metrics.delimiter」 設置變化。
因此,為了推斷指標的標識符:
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html
2. 5 一文看完flink的內存管理
1)java對象的存儲密度比較低,對象主要包含 對象頭,對象數據,對齊填充。 其中對齊填充是沒用的,純粹是為了讓對象的大小到達8的倍數
2)Full GC非常影響性能,對大數據量的計算來說,fullGC可能會持續很久(秒級甚至分鍾級)
3)OOM導致JVM崩潰,因為是大數據計算,很有可能會分配出大的對象。
4)緩存未命中,CPU在進行計算時,會先從CPU的緩存中抓取數據,但是jvm堆上的內存不是連續的,會導致CPU緩存不命中,CPU空轉,影響效率。
5)傳輸過程,要序列化和反序列化
Flink將對象存儲在堆外內存中,或者存在 memorySegment上
memorySegment:
1 翻譯為內存段,存儲序列化後的對象
2 它是一段固定長度的內存(大小為32KB)
3 是FLink中最小的內存分配單元
4 讀寫非常高效,很多運算元可以直接操作其二進制數據,不需要反序列化
5 Java本身自帶的序列化和反序列化的功能,但是輔助信息佔用空間比較大,在序列化對象時記錄了過多的類信息。
Flink實現了自己的序列化框架,使用TypeInformation表示每種數據類型,所以可以只保存一份對象Schema信息,節省存儲空間。又因為對象類型固定,所以可以通過偏移量存取。
jobmanager.heap.size:1024m
jobmanager.memory.process.size:1600m
主要包含 堆內存和非堆內存,相對比較簡單一些。
關於rocksDb內存管理:
由於rocksdb分配的是堆外內存,內存量理論上不受jvm控制。於是產生問題,如果進程的內存使用超過容器限定的量,就會被資源管理器殺死。
3. Flink的處理機制以及側輸出應用
Flink EventTime和Watermark
https://www.jianshu.com/p/5e735b63fb5b
Flink只要不用時間窗口函數,就是基於事件處理,對於事件驅動的任務,我們需要關心的點,尤其是存在shuffle和聚合的時候:
<1> 是否存在數據傾斜
<2> 是否會存在某些節點狀態過大 (例如使用狀態時,不配置過期時間,那麼狀態會一直緩存,就會導致內容一直增加,帶來gc等問題)
基於窗口操作。
watermark機制,就可以理解為驅動flink基於事件時間處理的機制
一旦允許了元素滯後,那麼滯後元素在截止時間前到達後也會觸發計算並輸出結果。
注意:滯後元素觸發計算應該視為之前計算的更新結果,也即是相同計算會有多個結果。根據你的應用邏輯,要考慮是否會重復計算或者進行去重。
就是數據什麼時候計算並落地。
Triggers定義了何時開始使用窗口計算函數計算窗口。每個WindowAssigner都會有一個默認的Trigger。如果,默認的Trigger不能滿足你的需求,你可以指定一個自定義的trigger()。
抽象類Trigger:
onElement():進入窗口的每個元素都會調用該方法。
onEventTime():事件時間timer觸發的時候被調用。
onProcessingTime():處理時間timer觸發的時候會被調用。
onMerge():有狀態的觸發器相關,並在它們相應的窗口合並時合並兩個觸發器的狀態,例如使用會話窗口。
clear():該方法主要是執行窗口的刪除操作。
1).前三方法決定著如何通過返回一個TriggerResult來操作輸入事件。
CONTINUE:什麼都不做。
FIRE:觸發計算。
PURE:清除窗口的元素。
FIRE_AND_PURE:觸發計算和清除窗口元素。
2). 這些方法中的任何一個都可用於為將來的操作注冊處理或事件時間計時器
所有基於事件時間的窗口分配器都用EventTimeTrigger作為默認觸發器。該觸發器會在watermark達到窗口的截止時間(window.maxTimestamp())時直接觸發計算輸出。
注意:GlobalWindows的默認觸發器是NeverTrigger,這個是不會觸發計算的。因此,你需要為GlobalWindows自己定義一個觸發器。
注意:通過調用trigger()方法指定觸發器,就可以覆蓋掉默認的觸發器。例如,如果把TumblingEventTimeWindows的觸發器指定為CountTrigger,那麼就不會在根據時間進度觸發窗口計算了。如果想同時支持時間和計數進行計算觸發,那麼就需要你自定義觸發器了。
trigger()方法====》
在DataStream調用過窗口函數後,返回WindowedStream,就可以調用trigger()方法,指定新的Trigger替代默認Trigger。
窗口生命周期
簡單來說,窗口創建於屬於該窗口的第一個元素到達,結束於事件時間或者處理時間達到了窗口的結尾時間加上用戶自定義的允許延遲時間。
分流:使用split函數;
異常值捕獲:(比如空值,欄位缺失數據,異常大值等),滯後數據處理等可以用sideoutput。
場景舉例:
比如現在有一篇文章吧,單詞長度不一,但是我們想對單詞長度小於5的單詞進行wordcount操作,同時又想記錄下來哪些單詞的長度大於了5,那麼我們該如何做呢?
普遍的做法是:
datastream.filter(word.length>=5); //獲取不統計的單詞,也即是單詞長度大於等於5。
datastream.filter(word.length <5);// 獲取需要進行wordcount的單詞。
這樣數據,然後每次篩選都要保留整個流,然後遍歷整個流,顯然很浪費性能,假如能夠在一個流了多次輸出就好了, flink的側輸出提供了這個功能,側輸出的輸出(sideoutput)類型可以與主流不同,可以有多個側輸出(sideoutput),每個側輸出不同的類型。
在使用側輸出的時候需要先定義OutputTag
如:
OutputTag有兩個構造函數,上面例子構造函數只有一個id參數,還有一個構造函數包括兩個參數,id,TypeInformation信息。
要使用側輸出, 在處理數據的時候除了要定義相應類型的OutputTag外,還要使用特定的函數,主要是有四個 :
ProcessFunction、CoProcessFunction、
ProcessWindowFunction、ProcessAllWindowFunction
1.8之後添加了 KeyedProcessFunction
這里滯後數據的類型應該需要與DataStream中的類型保持一致。
4. Flink | Checkpoint 機制詳解
一、Checkpoint 簡介
Flink 的 Checkpoint 機制是其 可靠性 的基石。當一個任務在運行過程中出現故障時,可以根據 Checkpoint 的信息恢復到故障之前的某一狀態,然後從該狀態恢復任務的運行。 在 Flink 中,Checkpoint 機制採用的是 chandy-lamport (分布式快照)演算法,通過 Checkpoint 機制,保證了 Flink 程序內部的 Exactly Once 語義。
二、Checkpoint 機制流程詳解
1. 任務啟動
我們假設任務從 Kafka 的某個 Topic 中讀取數據,該Topic 有 2 個 Partition,故任務的並行度為 2。根據讀取到數據的奇偶性,將數據分發到兩個 task 進行求和。
某一時刻,狀態如下:
2.啟動Checkpoint
JobManager 根據 Checkpoint 間隔時間,啟動 Checkpoint。此時會給每個 Source 發送一個 barrier 消息,消息中的數值表示 Checkpoint 的序號,每次啟動新的 Checkpoint 該值都會遞增。
3. Source啟動Checkpoint
當Source接收到barrier消息,會將當前的狀態(Partition、Offset)保存到 StateBackend,然後向 JobManager 報告Checkpoint 完成。之後Source會將barrier消息廣播給下游的每一個 task:
4.task 接收 barrier
當task接收到某個上游(如這里的Source1)發送來的barrier,會將該上游barrier之前的數據繼續進行處理,而barrier之後發送來的消息不會進行處理,會被緩存起來。
之前對barrier的理解比較模糊,直到看到了下面這幅圖。barrier的作用和這里 "歡迎光臨" 牌子的作用類似,用於區分流中的數據屬於哪一個 Checkpoint:
我們可以理解為:barrier之前的數據屬於本次Checkpoint,barrier之後的數據屬於下一次Checkpoint,所以下次Checkpoint的數據是不應該在本次Checkpoint過程中被計算的,因此會將數據進行緩存。
5.barrier對齊
如果某個task有多個上游輸入,如這里的 sum_even 有兩個 Source 源,當接收到其中一個 Source 的barrier後,會等待其他 Source 的 barrier 到來。在此期間,接收到 barrier 的 Source 發來的數據不會處理,只會緩存(如下圖中的數據4)。而未接收到 barrier 的 Source 發來的數據依然會進行處理,直到接收到該Source 發來的 barrier,這個過程稱為 barrier的對齊 。
barrier是否對齊決定了程序實現的是 Exactly Once 還是 At Least Once:
如果不進行barrier對齊,那麼這里 sum_even 在接收 Source2 的 barrier 之前,對於接收到 Source1的 數據4 ,不會進行緩存,而是直接進行計算,sum_even 的狀態改為12,當接收到 Source2 的barrier,會將 sum_even 的狀態 sum=12 進行持久化。如果本次Checkpoint成功,在進行下次 Checkpoint 前任務崩潰,會根據本次Checkpoint進行恢復。此時狀態如下:
從這里我們就可以看出, Source1的數據4被計算了兩次 。因此,Exactly Once語義下,必須進行barrier的對齊,而 At Least Once語義下 barrier 可以不對齊。
注意:barrier對齊只會發生在多對一的Operator(如 join)或者一對多的Operator(如 reparation/shuffle)。如果是一對一的Operator,如map、flatMap 或 filter 等,則沒有對齊這個概念,都會實現Exactly Once語義,即使程序中配置了At Least Once 。
6.處理緩存數據
當task接收到所有上游發送來的barrier,即可以認為當前task收到了本次 Checkpoint 的所有數據。之後 task 會將 barrier 繼續發送給下游,然後處理緩存的數據,比如這里 sum_even 會處理 Source1 發送來的數據4. 而且,在這個過程中 Source 會 繼續讀取數據 發送給下游,並不會中斷。
7.上報Checkpoint完成
當sink收到barrier後,會向JobManager上報本次Checkpoint完成。至此,本次Checkpoint結束,各階段的狀態均進行了持久化,可以用於後續的故障恢復。
5. Flink 分布式緩存原理及使用
在1.9.1版本中分布式緩存並未拷貝HDFS下的文件到TM,運行時拋出如下異常。
升級到1.10.1版本,能正常使用。藉此,學習下Flink 分布式緩存相關知識。
官網對 distributed cache 的定義:
意思是通過Flink程序注冊一個本地或者Hdfs文件,程序在運行時,Flink會自動將該文件拷貝到每個tm中,每個函數可以通過注冊的名稱獲取該文件。
官網給出的使用案例:
參考flink1.10.1版本的源碼,了解實現流程。
StreamGraphGenerator-->StreamGraph
4. yarnPerjob 模式部署jobGraph時,如果是本地文件則上傳本地zip,返回該文件所在的hdfs路徑。如果緩存文件為hdfs已存在路徑,則直接寫入配置文件。
6. flink流處理特點
7. Flink 原理詳解
Flink 是一個流處理框架,支持流處理和批處理,特點是流處理有限,可容錯,可擴展,高吞吐,低延遲。
流處理是處理一條,立馬下一個節點會從緩存中取出,在下一個節點進行計算
批處理是只有處理一批完成後,才會經過網路傳輸到下一個節點
流處理的優點是低延遲 批處理的優點是高吞吐
flink同時支持兩種,flink的網路傳輸是設計固定的緩存塊為單位,用戶可以設置緩存塊的超時值來決定換存塊什麼時候進行傳輸。 數據大於0 進行處理就是流式處理。
如果設置為無限大就是批處理模型。
Flink 集群包括 JobManager 和 TaskManager .
JobManager 主要負責調度 Job 並協調 Task 做 checkpoint,職責上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包 等資源後,會生成優化後的執行計劃,並以 Task 的單元調度到各個 TaskManager 去執行。
TaskManager 在啟動的時候就設置好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為線程。從 JobManager 處接收需要 部署的 Task,部署啟動後,與自己的上游建立 Netty 連接,接收數據並處理。
flink on yarn 是由client 提交 app到 RM 上, 然後RM 分配一個 AppMaster負責運行 Flink JobManager 和 Yarn AppMaster, 然後 AppMaster 分配 容器去運行 Flink TaskManger
SparkStreaming 是將流處理分成微批處理的作業, 最後的處理引擎是spark job
Spark Streaming把實時輸入數據流以時間片Δt (如1秒)為單位切分成塊,Spark Streaming會把每塊數據作為一個RDD,並使用RDD操作處理每一小塊數據。每個塊都會生成一個Spark Job處理,然後分批次提交job到集群中去運行,運行每個 job的過程和真正的spark 任務沒有任何區別。
JobScheler, 負責 Job的調度通過定時器每隔一段時間根據Dstream的依賴關系生一個一個DAG圖
ReceiverTracker負責數據的接收,管理和分配
ReceiverTracker在啟動Receiver的時候他有ReceiverSupervisor,其實現是ReceiverSupervisorImpl, ReceiverSupervisor本身啟 動的時候會啟動Receiver,Receiver不斷的接收數據,通過BlockGenerator將數據轉換成Block。定時器會不斷的把Block數據通會不斷的把Block數據通過BlockManager或者WAL進行存儲,數據存儲之後ReceiverSupervisorlmpl會把存儲後的數據的元數據Metadate匯報給ReceiverTracker,其實是匯報給ReceiverTracker中的RPC實體ReceiverTrackerEndpoin
spark on yarn 的cluster模式, Spark client 向RM提交job請求, RM會分配一個 AppMaster, driver 和 運行在AppMAster節點里, AM然後把Receiver作為一個Task提交給Spark Executor 節點, Receive啟動接受數據,生成數據塊,並通知Spark Appmaster, AM會根據數據塊生成相應的Job, 並把Job 提交給空閑的 Executor 去執行。
1:需要關注流數據是否需要進行狀態管理
2:At-least-once或者Exectly-once消息投遞模式是否有特殊要求
3:對於小型獨立的項目,並且需要低延遲的場景,建議使用storm
4:如果你的項目已經使用了spark,並且秒級別的實時處理可以滿足需求的話,建議使用sparkStreaming
5:要求消息投遞語義為 Exactly Once 的場景;數據量較大,要求高吞吐低延遲的場景;需要進行狀態管理或窗口統計的場景,建議使用flink
Flink 提供的Api右 DataStream 和 DataSet ,他們都是不可變的數據集合,不可以增加刪除中的元素, 通過 Source 創建 DataStream 和 DataSet
在創建運行時有:
Flink的每一個Operator稱為一個任務, Operator 的每一個實例稱為子任務,每一個任務在JVM線程中執行。可以將多個子任務鏈接成一個任務,減少上下文切換的開銷,降低延遲。
source 和 運算元map 如果是 one by one 的關系,他們的數據交換可以通過緩存而不是網路通信
TaskManager 為控制執行任務的數量,將計算資源劃分多個slot,每個slot獨享計算資源,這種靜態分配利於任務資源隔離。
同一個任務可以共享一個slot, 不同作業不可以。
這里因為 Source 和 Map 並行度都是4 採用直連方式,他們的數據通信採用緩存形式
所以一共需要兩個TaskManager source,Map 一個,rece一個, 每個TaskManager 要3個slot
JobManager 將 JobGraph 部署 ExecutionGraph
設置的並行度,可以讓一個ExecJobVertex 對應 多個並行的ExecVertex 實例。
Flink通過狀態機管理 ExecGraph的作業執行進度。
Flink 將對象序列化為固定數量的預先分配的內存段,而不是直接把對象放在堆內存上。
Flink TaskManager 是由幾個內部組件組成的:actor 系統(負責與 Flink master 協調)、IOManager(負責將數據溢出到磁碟並將其讀取回來)、MemoryManager(負責協調內存使用。
數據源:
Sink:
時間:
處理時間:取自Operator的機器系統時間
事件時間: 由數據源產生
進入時間: 被Source節點觀察時的系統時間
如果數據源沒有自己正確創建水印,程序必須自己生成水印來確保基於事件的時間窗口可以正常工作。。
DataStream 提供了 周期性水印,間歇式水印,和遞增式水印
8. 哪位好心人能提供個最新flink視頻學習教程,感謝
大數據教程flink從入門到精通
了解Flink,了解集群環境搭建運維,學習Flink中重要概念、原理和API的用法,通過知識點 + 案例教學法幫助小白快速掌握Flink。
課程內容:
1、Flink框架簡介
2、Flink集群搭建運維
3、Flink Dataset開發
4、Flink 廣播變數,分布式緩存,累加器
5、Flink Datastream開發
6、Flink Window操作
7、Flink watermark與側道輸出
8、Flink狀態計算
9、Flink容錯checkpoint與一致性語義
10、Flink進階 非同步IO,背壓,內存管理
11、Flink Table API與SQL
9. Flink的類載入器解析
在運行 Flink 應用程序時,JVM 會隨著時間的推移載入各種類。 這些類可以根據它們的來源分為三組:
作為一般規則,無論何時您先啟動 Flink 進程然後再提交作業,作業的類都會動態載入。 如果 Flink 進程與作業/應用程序一起啟動,或者如果應用程序產生 Flink 組件(JobManager、TaskManager 等),那麼所有作業的類都在 Java 類路徑中。
插件組件中的代碼由每個插件的專用類載入器動態載入一次。
以下是有關不同部署模式的更多詳細信息:
當作為獨立會話啟動 Flink 集群時,JobManagers 和 TaskManagers 使用 Java 類路徑中的 Flink 框架類啟動。 針對會話(通過 REST / CLI)提交的所有作業/應用程序中的類都是動態載入的。
Docker / Kubernetes 設置首先啟動一組 JobManagers / TaskManagers,然後通過 REST 或 CLI 提交作業/應用程序,其行為類似於獨立會話:Flink 的代碼位於 Java 類路徑中,插件組件和作業代碼在啟動時動態載入。
YARN 類載入在單個作業部署和會話之間有所不同:
當直接向 YARN 提交 Flink 作業/應用程序時(通過 bin/flink run -m yarn-cluster ...),將為該作業啟動專用的 TaskManager 和 JobManager。 這些 JVM 在 Java 類路徑中具有用戶代碼類。 這意味著在這種情況下,作業不涉及動態類載入。
當啟動一個 YARN 會話時,JobManagers 和 TaskManagers 是用 classpath 中的 Flink 框架類啟動的。 針對會話提交的所有作業的類都是動態載入的。
在涉及動態類載入的設置中(插件組件、會話設置中的 Flink 作業),通常有兩個類載入器的層次結構:(1)Java 的應用程序類載入器,它包含類路徑中的所有類,以及(2)動態插件/ 用戶代碼類載入器。 用於從插件或用戶代碼 jar 載入類。 動態 ClassLoader 將應用程序類載入器作為其父級。
默認情況下,Flink 反轉類載入順序,這意味著它首先查看動態類載入器,如果類不是動態載入代碼的一部分,則僅查看父類(應用程序類載入器)。
反向類載入的好處是插件和作業可以使用與 Flink 核心本身不同的庫版本,這在不同版本的庫不兼容時非常有用。 該機制有助於避免常見的依賴沖突錯誤,如 IllegalAccessError 或 NoSuchMethodError。 代碼的不同部分只是具有單獨的類副本(Flink 的核心或其依賴項之一可以使用與用戶代碼或插件代碼不同的副本)。 在大多數情況下,這運行良好,不需要用戶進行額外配置。
但是,在某些情況下,反向類載入會導致問題(請參閱下文,「X cannot be cast to X」)。 對於用戶代碼類載入,您可以通過在 Flink 配置中通過 classloader.resolve-order 將 ClassLoader 解析順序配置為 parent-first(從 Flink 的默認 child-first)來恢復到 Java 的默認模式。
請注意,某些類總是以父級優先的方式解析(首先通過父類載入器),因為它們在 Flink 的核心和插件/用戶代碼或面向插件/用戶代碼的 API 之間共享。 這些類的包是通過 classloader.parent-first-patterns-default 和 classloader.parent-first-patterns-additional 配置的。 要添加父級優先載入的新包,請設置 classloader.parent-first-patterns-additional 配置選項。
所有組件(JobManger、TaskManager、Client、ApplicationMaster 等)在啟動時記錄它們的類路徑設置。 它們可以作為日誌開頭的環境信息的一部分找到。
當運行 JobManager 和 TaskManagers 專用於一項特定作業的設置時,可以將用戶代碼 JAR 文件直接放入 /lib 文件夾中,以確保它們是類路徑的一部分而不是動態載入。
通常將作業的 JAR 文件放入 /lib 目錄中。 JAR 將成為類路徑(AppClassLoader)和動態類載入器(FlinkUserCodeClassLoader)的一部分。 因為 AppClassLoader 是 FlinkUserCodeClassLoader 的父級(並且 Java 載入父級,默認情況下),這應該導致類只載入一次。
對於無法將作業的 JAR 文件放入 /lib 文件夾的設置(例如因為安裝程序是由多個作業使用的會話),仍然可以將公共庫放入 /lib 文件夾,並避免動態為那些類進行載入。
在某些情況下,轉換函數、源或接收器需要手動載入類(通過反射動態載入)。 為此,它需要能夠訪問作業類的類載入器。
在這種情況下,函數(或源或接收器)可以成為 RichFunction(例如 RichMapFunction 或 RichWindowFunction)並通過 getRuntimeContext().getUserCodeClassLoader() 訪問用戶代碼類載入器。
在使用動態類載入的設置中,您可能會看到 com.foo.X cannot be cast to com.foo.X 樣式中的異常。 這意味著 com.foo.X 類的多個版本已被不同的類載入器載入,並且該類的類型試圖相互分配。
一個常見的原因是庫與 Flink 的反向類載入方法不兼容。 您可以關閉反向類載入來驗證這一點(在 Flink 配置中設置 classloader.resolve-order: parent-first)或從反向類載入中排除庫(在 Flink 配置中設置 classloader.parent-first-patterns-additional)。
另一個原因可能是緩存對象實例,如 Apache Avro 之類的某些庫或通過注冊(例如通過 Guava 的 Interners)生成的對象實例。 這里的解決方案是要麼在沒有任何動態類載入的情況下進行設置,要麼確保相應的庫完全是動態載入代碼的一部分。 後者意味著該庫不能被添加到 Flink 的 /lib 文件夾中,而必須是應用程序的 fat-jar/uber-jar 的一部分
所有涉及動態用戶代碼類載入(會話)的場景都依賴於再次卸載類。 類卸載意味著垃圾收集器發現類中不存在任何對象,因此刪除該類(代碼、靜態變數、元數據等)。
每當 TaskManager 啟動(或重新啟動)一個任務時,它將載入該特定任務的代碼。 除非可以卸載類,否則這將成為內存泄漏,因為載入了新版本的類,並且載入的類總數會隨著時間的推移而累積。 這通常通過 OutOfMemoryError: Metaspace 表現出來。
類泄漏的常見原因和建議的修復:
卸載動態載入類的一個有用工具是用戶代碼類載入器釋放鉤子。 這些是在卸載類載入器之前執行的鉤子。 通常建議關閉和卸載資源作為常規函數生命周期的一部分(通常是 close() 方法)。 但在某些情況下(例如對於靜態欄位),最好在不再需要類載入器時卸載。
類載入器釋放鉤子可以通過 RuntimeContext.() 方法注冊。
從應用程序開發人員的角度解決依賴沖突的一種方法是通過隱藏它們來避免暴露依賴關系。
Apache Maven 提供了 maven-shade-plugin,它允許在編譯後更改類的包(因此您編寫的代碼不受陰影影響)。 例如,如果您的用戶代碼 jar 中有來自 aws sdk 的 com.amazonaws 包,則 shade 插件會將它們重新定位到 org.myorg.shaded.com.amazonaws 包中,以便您的代碼調用您的 aws sdk 版本。
注意 Flink 的大部分依賴,比如 guava、netty、jackson 等,都被 Flink 的維護者屏蔽掉了,所以用戶通常不用擔心。
10. Flink內存管理
java所有數據類型對應的位元組大小
java對象的組成 : 對象頭,實例數據,對齊部分
jvm 序列化缺點
上面圖為TaskManager內存模型,左邊為細分的內存模型,右邊為整體內存模型,該圖摘自Flink官網
heap內存在jvm啟動的時候申請的一塊不變的內存區域,該內存實際上是Flink和task公用的一塊區域,在flink層面通過控制來區分框架使用和task內存,heap內存管理起來是比較容易的,實際上non-heap的內存是難管理的一塊,如果管理不當或者使用不當可能造成內存泄漏或者內存無限增長等問題
內存參數配置
在flink中對內存進行了抽象成了MemorySegment,�默認情況下,一個 MemorySegment 對應著一個 32KB 大小的內存塊,這塊內存既可以是堆上內存( byte數組) ,也可以是堆外內存(nio的ByteBufferr ) .
同時MemorySegment也提供了對二進制數據的操作方法,以及讀取位元組數組序列化以及序列化位元組數組的方法等
下面是類繼承圖,該類有兩MemorySegment實現類有兩個分別為使用heap的以及混合的即有heap和non-heap,對於內存的訪問有子類具體的實現
MemorySemgent是flink內存分配的最小單元了,對於數據誇MemorySemgent保存,那麼對於上層的使用者來說,需要考慮考慮所有的細節,由於過於繁瑣,所以在MemorySemgent上又抽象了一層內存也,內存也是在MemorySemgent數據訪問上的視圖,對數據輸入和輸出分別抽象為DataInputView/DataOutputView,有了這一層,上層使用者無需關心跨MemorySemgent的細節問題,內存也對自動處理跨MemorySemgent的內存操作
DataInputView
DataInputView繼承DataInput,DataInputView是對MemorySemgent讀取的抽象視圖,提供一系列讀取二進制數據不同類型的方法,AbstractPageInputView是DataInputView的一個抽象實現類,並且基本所有InputView都實現了該類,即所有實現該類的InputView都支持Page
InputView持有了多個MemorySemgent的引用(可以基於數組,list,deque等),這些MemorySemgent被視為一個內存頁,可以順序,隨機等方式讀取數據,要基於不同的實現類,實現類不同讀取方式不同
方法圖
DataOutputView
與DataInputView相對應,繼承Output,並有一個擁有Page功能的抽象類(AbstractPagedOutputView),其大部outputView的實現都是繼承自該抽象類,對一組MemorySemgent提供一個基於頁的寫入功能
方法圖
類繼承圖
用於網路io數據的包裝,每個buffer持有一個MemorySegment的引用,resultPartition寫數據的時候,會向LocalBufferPool申請Buffer,會返回BufferBuilder,通過BufferBuilder想Buffe r<實際寫入的是MemorySegment> 寫數據
BufferBuilder是在上游Task中,負責想Buffer寫入數據,BufferConsumer位於下游,與BufferBuilder相對應,用於消費Buffer的數據,每個bufferBuilder對應一個bufferConsumer
常用參數介紹
buffer申請
buffer回收
當buffer用完之後需要進行回收比如在netty的clientHandler收到響應之後進行處理就會把buffer回收掉,buffer回收之後並不會釋放memorySegment,而是放回池中,變為可用內存,反復使用
flink託管的內存,託管內存使用堆外內存,用於批處理緩存排序等以及提供rocksDB內存
NetworkBufferPool是一個固定大小的MemorySegment實例吃,用於網路棧中,NettyBufferPool會為每個ResultPartition創建屬於自己的LocalBufferPool,NettyBufferPool會作為全局的pool來提供內存,LocalBufferPool會通過限制來控制自己內存的申請,防止過多申請
LocalBufferPool繼承關系,實現了bufferRecycler的介面,用於回收自己持有的buffer
在數據接收的時候會將數據封裝成NettyBuffer,在數據發送的時候會通過BufferBilder向MemorySegment寫入數據,然後通過BufferConsumer讀取MemorySegment的數據
BufferManager主要用於為RemoteInputChannel提供buffer的,bufferManager在啟動的時候會向全局bufferPool請求自己的獨有buffer,當bufferManager的buffer不夠的時候,則會向localBufferPool請求buffer,此時請求的buffer為浮動buffer
實際上提供的buffer是