❶ Flink sql實戰演練之自定義Clickhouse Connector
簡介:實時數倉目前的架構是flink+clickhouse,社區目前jdbc connector不支持clickhouse的方言,所以決定自定義clickhouse connector實現flink sql寫入數據到clickhouse。
目前想要實現flink sql數據落地到ck,可以修改jdbc connector的源碼,增加ck方言,或者採用阿里提供的ck connector包,為了更好的理解flink connector的原理,這里自定義connector實現。
目前支持Flink寫入Clickhouse的依賴哭比較多,如果數據格式固定,可以CSV的方式寫入,如果不固定,可採用Json的方式寫入。
❷ Flink SQL 知其所以然(五)| 自定義 protobuf format
protobuf 作為目前各大公司中最廣泛使用的高效的協議數據交換格式工具庫,會大量作為流式數據傳輸的序列化方式,所以在 flink sql 中如果能實現 protobuf 的 format 會非常有用( 目前社區已經有對應的實現,不過目前還沒有 merge,預計在 1.14 系列版本中能 release )。
issue 見: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC
pr 見: https://github.com/apache/flink/pull/14376
這一節主要介紹 flink sql 中怎麼自定義實現 format ,其中以最常使用的 protobuf 作為案例來介紹。
如果想在本地直接測試下:
關於為什麼選擇 protobuf 可以看這篇文章,寫的很詳細:
http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral
在實時計算的領域中,為了可讀性會選擇 json ,為了效率以及一些已經依賴了 grpc 的公司會選擇 protobuf 來做數據序列化,那麼自然而然,日誌的序列化方式也會選擇 protobuf 。
而官方目前已經 release 的版本中是沒有提供 flink sql api 的 protobuf format 的。如下圖,基於 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
因此本文在介紹怎樣自定義一個 format 的同時,實現一個 protobuf format 來給大家使用。
預期效果是先實現幾種最基本的數據類型,包括 protobuf 中的 message (自定義 model)、 map (映射)、 repeated (列表)、其他基本數據類型等,這些都是我們最常使用的類型。
預期 protobuf message 定義如下:
測試數據源數據如下,博主把 protobuf 的數據轉換為 json,以方便展示,如下圖:
預期 flink sql:
數據源表 DDL:
數據匯表 DDL:
Transform 執行邏輯:
下面是我在本地跑的結果:
可以看到列印的結果,數據是正確的被反序列化讀入,並且最終輸出到 console。
目前業界可以參考的實現如下: https://github.com/maosuhan/flink-pb , 也就是這位哥們負責目前 flink protobuf 的 format。
這種實現的具體使用方式如下:
其實現有幾個特點:
[圖片上傳失敗...(image-66c35b-1644940704671)]
其實上節已經詳細描述了 flink sql 對於 sourcesinkformat 的載入機制。
如圖 serde format 是通過 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 創建的
所有通過 SPI 的 sourcesinkformt 插件都繼承自 Factory 。
整體創建 format 方法的調用鏈如下圖。
最終實現如下,涉及到了幾個實現類:
具體流程:
上述實現類的具體關系如下:
介紹完流程,進入具體實現方案細節:
ProtobufFormatFactory 主要創建 format 的邏輯:
resourcesMETA-INF 文件:
主要實現反序列化的邏輯:
可以注意到上述反序列化的主要邏輯就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定義的。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 其實就是一個 convertor 介面:
其作用就是將 protobuf message 中的每一個欄位轉換成為 RowData 中的每一個欄位。
ProtobufToRowDataConverters 中就定義了具體轉換邏輯,如截圖所示,每一個 LogicalType 都定義了 protobuf message 欄位轉換為 flink 數據類型的邏輯:
源碼後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取。
本文主要是針對 flink sql protobuf format 進行了原理解釋以及對應的實現。
如果你正好需要這么一個 format,直接後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取源碼吧。
當然上述只是 protobuf format 一個基礎的實現,用於生產環境還有很多方面可以去擴展的。
❸ Kylin 在騰訊的平台化及 Flink 引擎實踐
首先,介紹下我們為什麼進行平台化改造?
我們部門為公司內其他業務線提供了各種大數據平台,如 Kylin、HBase、Spark、Flink 等等,提供公共統一的平台系統勢必會牽扯到用戶管理、資源隔離、部門內各個平台的融合等問題,而 Kylin 現有的用戶管理、資源隔離機制並不能滿足我們需求,基於此,我們對 Kylin 進行了平台化改造。平台化改造完成後,我希望在以下幾個方面,能夠有一些改進:
1. 用戶管理
為了便於系統的管理及安全,公司內部有一套自己的認證系統,而且需要用個人賬號去驗證,所以 Kylin 作為一個平台對外提供服務的話,也需要接入到該系統。所以,我們新增了一個用戶管理界面,該界面展示了 Kylin 平台內的所有用戶。管理員可以新增任一用戶到 Kylin 平台,新增用戶時會填寫企業微信名、用戶角色以及是否激活用戶。當用戶登錄系統時,會自動檢測用戶賬號以及該賬號是否在平台內注冊,如果沒有注冊則無許可權,反之自動登錄系統。
2. 內部 Hive 兼容
由於 歷史 原因,我們部門內的 Hive 版本(THive)與 Kylin 不兼容,這就導致 Kylin 無法正常訪問 Kylin 集群,所以我們採用了上圖所示的兼容方案。首先,我們使用社區 Hive 版本搭建一個全新的 Hive,並作為 Kylin 的默認 Hive;其次,當 kylin 載入源表時,我們是通過內部的 UPS 系統讀取 THive 的元數據信息;最後,在 Load 源表到 Kylin 時,我們根據表的元數據信息在 Kylin 的 Hive 上創建一張相同的表,但該表的存儲路徑依舊指向 THive 的路徑,而用戶在構建 cube 時,則訪問新創建的表,至此就解決了 Kylin 訪問 THive 的問題。
3. 計算資源可配置化
目前,Kylin 配置計算資源信息有兩種方式:一是在 Kylin 配置文件中配置一個全局的計算集群及隊列;二是在創建工程或者 Cube 時,在擴展參數中指定集群配置。這兩種配置方式在靈活性及便捷性方面都比較差,而在我們內部是有介面可以獲取到某一個用戶有計算資源的計算集群及計算隊列的,所以,在創建工程或者 Cube 時,我們使用了下拉框選擇式的方式,讓用戶選擇提交任務的計算資源及隊列,從而大大簡化了用戶的使用流程。
4. 通知機制
Kylin 只提供了發郵件通知的功能,而作為目前使用最廣泛的工具,微信、企業微信在實時性及便捷性方面都遠遠勝於郵件,所以,我們提供了郵件、微信、企業微信三種方式,供用戶選擇。
5. 定時調度
Kylin 系統自身並沒有提供定時調度功能,但基本上每家公司都有自己的統一調度平台,我們也不例外。我們通過 Kylin 提供的API介面,將 Cube 定時構建的功能作為一個插件集成到了公司內部的統一調度平台上。
6. 業務接入
做完以上平台化改造後,Kylin 平台基本具備了接入不同類型業務的能力,用戶申請接入流程如上圖所示。
業務使用情況:
我們團隊是在今年初才開始引入 Kylin,目前已經在使用的業務主要有 QQ 音樂、騰訊視頻、廣點通、財付通等,Cube 的數量有 10 個,單份數據存儲總量是 5 T,數據規模在 30 億條左右。
Flink Cube Engine 原理及實踐
目前,Kylin 已經支持使用 MapRece 和 Spark 作為構建引擎,而作為目前比較火的流批一體的大數據計算引擎怎能缺席?所以我使用 Flink 開發了一個高性能的構建引擎:Flink Cube Engine。
Flink Cube Engine 是騰訊基於 Kylin 插件化的 Cube Engine 架構開發的一個高性能構建引擎,目前已具備了上線使用的能力,感興趣的同學可以體驗一下,目前該引擎已經在騰訊生產環境上線 1 個月+,非常穩定而且效果不錯。
Umbrella issue:
https://issues.apache.org/jira/browse/KYLIN-3758
分支:
https://github.com/apache/kylin/tree/engine-flink
1. 支持 Flink Engine 的子任務
Kylin 的一次 Cube 構建任務,包含了很多個子任務,而最重要的莫過於 Cube 構建這一步驟,所以,我們在 build 和 merge Cube 這兩種任務中,優先實現了Cube 構建這一步驟,其他計算步驟依舊通過使用 MapRece 來實現。
2. 如何使用 Flink Cube Engine
選擇使用 Flink Cube Engine 的方式也和選擇 Map Rece 和 Spark 任務類似,我們提供了前台可視化的界面,供用戶選擇。
3. Flink Cube Engine 與 Spark (線上業務)
上圖是我們內部業務上線 Flink Cube Engine 之後的性能對比,從圖中可見,該步驟的構建耗時從 49 分鍾降到了 13 分鍾,優化效果比較明顯。兩種情況的資源配置如下:
Flink 配置為:
-ytm 4G -yjm 2G -ys 1 -p 100 -yn 100
Spark 採用的動態分配資源如下:
kylin.engine.spark-conf.spark.dynamicAllocation.enabled=true
kylin.engine.spark-conf.spark.dynamicAllocation.minExecutors=2
kylin.engine.spark-conf.spark.dynamicAllocation.maxExecutors=1000
kylin.engine.spark-conf.spark.dynamicAllocation.executorIdleTimeout=300
kylin.engine.spark-conf.spark.shuffle.service.enabled=true
kylin.engine.spark-conf.spark.shuffle.service.port=7337
雖然,Spark 採用的是動態分配資源,但在任務執行過程中,我們觀察到 Spark實際分配的資源遠比 Flink 要多的多。
那為什麼性能提升會那麼明顯呢?
4. Flink Cube Engine 的優化
性能的提升,無非有兩方面的原因,一是參數的優化,二是代碼的優化。
1) 調參
影響 Flink 任務性能主要有幾個核心參數:並行度、單個 TM slot 數目、TM container 數目,其中單個 TM container 數目=並行度/單個 TM slot 數目。
我們調優的過程採用了控制變數法,即:固定並行度不變、固定 Job 總內存數不變。通過不斷的調整單個 TM 的 slot 數目,我們發現 如果單個 TM 的 slot 數目減少,拉起更多的 TM container 性能會更好。
此外,我們還使用了對象復用、內存預分配等方法,發現沒有對性能提升起到太大的效果。
2) 代碼優化(合並計算)
在實現 Flink Cube Engine 的時候,一開始我們使用了 Map/Rece 兩個運算元,發現性能很差,比 Spark 的性能還要差很多,後來我們通過調整使用了 Flink 的 mapPartition/receGroup 兩個運算元,性能就有了明顯的提升。
Flink Cube Engine 下一步的計劃:
1. 全鏈路 Flink
如上所述,目前 Cube 構建過程中,只有最關鍵的 cube 構建這一子任務使用了 Flink,而其他子任務仍然使用的是 MapRece,我們下一步會繼續完善 Flink Cube Engine,將所有的子任務都使用 Flink 來構建。
2. Flink 升級到 1.9
Flink 最近發布了 1.9.0,該版本包含了很多重要特性且性能也有了一定提升,所以,我們會把 Flink Cube Engine 使用的 Flink 版本升級到1.9.0。
❹ Flink SQL實戰演練之自定義Table Format
簡介:接著上次Flink CDC繼續聊,提到這塊,不得不重點說下canal-json format了,canal json format對json format進行了封裝,負責把binlog形式的json數據轉化成了Flink能夠識別的RowData數據,當然也包括數據的描述信息封裝類RowType。筆者想實現根據RowKind進行數據的過濾,目前可以通過修改canal format的源數據來實現,也可以通過將changelog流以changelog json的形式回寫Kafka。
基於目前對table format的了解,這里自定義event json format,用來處理事件流數據,因為事件流欄位不固定,可能只有少部分欄位是固定的,其他欄位都是擴展的,所以筆者想實現用戶自定義schema指定公共欄位,然後其他欄位以json的行為存在metadata中的default欄位中。
Table Format作為Connector組件單獨用於序列化和反序列化內部數據的模塊而單獨存在,多個Connector可以公用。自定義Table Format可以讓大家更好的理解Flink SQL時如何將外部數據轉化為內部可以識別的RowData數據結構的,從而在排查問題的時候能准確定位到具體位置。
❺ Flink SQL 寫入 Hive表的性能問題
寫入Hive表的性能,每秒寫入記錄數,發現性能並不樂觀,上有節點背壓嚴重。
Hive Table DDL:
而寫入HDFS文件的性能,每秒寫入記錄數,性能符合期待。
HDFS文件的DDL:
翻閱Flink的PR,十幾天前,阿里Flink的開發同學已經注意到了這個問題,我們將之吸收到測試環境,編譯替換lib下jar包,重新測試,性能確實up了,單並發升至5W每秒,上游節點才稍微有背壓。
[FLINK-19121][hive] Avoid accessing HDFS frequently in HiveBulkWriterFactory
所以,Flink的新特性從發布到應用線上,穩定性與性能上都不能過於樂觀、聽信於官方宣傳,
司內另一教訓就是過早在熱數據存儲層啟用了Hadoop的糾刪碼,導致問題不斷,被迫退化到副本機制。
這與前期調研、驗證不足,對該特性過於輕信有莫大關系,教訓也是深刻。
底層採用Reed-Solomon(k,m)演算法,RS是一種常用的糾刪碼演算法,通過矩陣運算,可以為k位數據生成m位校驗位,根據k和m的取值不同,可以實現不同程度的容錯能力,是一種比較靈活的糾刪碼演算法。
HDFS糾刪碼技術能夠降低數據存儲的冗餘度,以RS(3,2)為例,其數據冗餘度為67%,相比Hadoop默認的200%大為減少。但是糾刪碼技術存儲數據和數據恢復都需要 消耗cpu進行計算,實際上是一種以時間換空間的選擇,因此比較適用的場景是對冷數據的存儲 。冷數據存儲的數據往往一次寫入之後長時間沒有訪問,這種情況下可以通過糾刪碼技術減少副本數。
❻ flink1.12.1擴展flink-sql 支持寫入到sqlserver
目前業務上有同步數據到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,
科學上網後發現袋鼠雲的flinkStreamSql已經有支持sqlserver,那就開始動手,參考實現一波
主要實現getUpsertStatement的方法,本來以為能直接一波flinkStreamSql 的實現,結果發現
報錯 SQL statement must not contain ? character.
查看源碼發現, flink在構建mysql的Statement,是先把需要替換的欄位前面拼接了 : fieldNames,然後在org.apache.flink.connector.jdbc.statement.類的parseNamedStatement 替換成 ?號, 既然如此,就針對了buildDualQueryStatement進行修改
完整的SqlServerDialect文件
最後替換原有的flink-jar包後,就可以用類似flink定義mysql的ddl進行定義表了
注意url寫法為:jdbc:jtds:sqlserver://xxx:1433;databaseName=master;
[flinkStreamSQL鏈接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java
❼ 流批一體不只有Flink,還有實時數據模型
通常來講,數據倉庫的建設,都是以離線作為主要的密報,下游的應用,不論是報表還是介面,所提供的數據也大多是T-1時效性。
但伴隨著業務的變化,當離線做到沒什麼可以繼續做的時候,實時就會被拿出來,作為新一個階段的目標進行攻克。
在流批一體建設之前,這種實時訴求通常會開發成分鍾級的任務,通過近實時的方案來解決業務的問題,但分鍾級會帶來諸如任務過多、資源擠占較大、無法支持復雜邏輯等問題。
因此專門支持實時計算的框架,比如早期的Storm,能夠嘗試從純實時的角度解決業務問題,就被拿出來作為嘗試。然而Storm的局限性也很大,因為那會的任務開發只能通過Java的方式來進行,與Hive所推崇的純SQL方案相比,上手難度大了不少,同時兩套代碼的邏輯幾乎沒有可比性,這種方案也就一直沒有什麼聲音。
盡管實時技術有各種缺陷,但作為一種能夠很容易講清楚價值的項目,同時又非常便於向上匯報的技術方案,實時技術還是被或多或少的做了起來。在大多數的公司里,實時和離線就會有不同的團隊進行維護,或者是同一個團隊,但分成不同的項目來執行。這個階段,優先高效的把業務做起來,哪怕場景再簡單,但能夠證明實時有價值和前景,這個階段的目標就算完成了。
以上的各種方案,難免會帶來三個特別難以解決的問題:
(1)數據的口徑上,實時和離線很容易不統一;
(2)數據模型的規范上,實時和離線也往往是分開建設;
(3)即便是同一種口徑和同一種規范,實時和離線也要分成兩套代碼來維護。
這三個問題短時間內會被高速發展掩蓋掉,但當業務對實時的訴求越來越多、壓力越來越大的時候,口徑和代碼的不統一,就會越來越成為阻礙敏捷開發的障礙,需要有方案進行解決。
後來Flink出現了,帶來了流批一體的全新方案,這個問題便出現了解決的曙光,這也比較接近我們對於實時計算的理想方案,因為其意義堪比Hive,也成為了各個大廠面試的標配問題。
然而,僅僅學會Flink是不夠的,因為流批一體帶來的並不僅僅是技術方案或者是框架的改變,同樣帶來了數據模型的改變,這就要求我們從數據模型上,而不是技術方案上,來制定我們的實時方案。
那麼我們如何理解「實時數據模型」這件事情呢?
通常而言,我們關心的內容,包括如下幾個方面:
(1)實時數據源與離線數據源存在差異,導致相同的欄位,取值或者類型會存在不相等的情況;
(2)實時和離線由於底層執行機制的不同,通常需要維護兩套代碼,會帶來諸如口徑不統一、質量檢測難的問題;
(3)產品邏輯變化較快時,離線模型修改相對容易,但實時模型需要考慮壓測、削峰、重啟等技術問題,維護成本非常高昂。
數據倉庫之所以能夠普及並被業務接受,正是因為其模型能夠屏蔽掉底層差異的問題,並且有相對可靠的數據質量監控方法,並且變更成本非常低。而實時數倉如果想要替代掉離線數倉,以上的問題通常是需要一些模型設計甚至是平台工具的來解決,這些問題解決的重要性,並不比Flink弱。
我們先從比較可控的模型層面說起。
在離線的概念里,數倉模型設計成了DWD/DWS/ADS三個層級,原本的概念是DWD面向事實表的構建,DWS面向公共指標的統一,ADS負責靈活的口徑變化問題。
在離線的概念里,DWD/DWS/ADS三個層級需要保留,但負責的目標會有一些變化,同時還需要增加存儲統一層,也就是以TiDB/Holo為代表的資料庫,來承擔服務分析一體化的訴求。
讓我們先看DWD層,DWD承擔了屏蔽實時離線鏈路差異的問題,最重要的作用是保證表結構的統一及欄位內容的對齊。DWD最重要的意義,是保證離線表和實時表,其表結構和欄位概念是相同的。
為什麼這么強調?試想一下,在離線場景下,我們可以在DWD上靈活的增加各種統計標簽,或者是將維度退化到事實表,都是一些left join或者是服務端直接打標可以解決的事情。但在實時場景下,這會變成多流join或者是緩存等更復雜的技術場景,導致這些信息並不能有效的記錄到DWD,因此DWD的設計就要產生一些變化,有一些內容在實時場景下無法准確記錄,這一類信息需要標識到對應的欄位描述上,下游使用時才不會出錯。
同時,實時和離線存儲數據的介質,也必然有一些區別。例如離線可以存在HDFS上,實時則可能視情況保存在資料庫、HDFS甚至是內存中,這時候對於欄位格式、讀取方式都會有差異,設計表時其約束條件也會更多。
因而,DWD更多承擔了邏輯統一的職責,依舊以事實表為基礎,但約束條款要比離線更多。
再看一下DWS層,離線上DWS是負責口徑統一的重要一環,將通用的維度和口徑計算方法抽象出現,以提供跨數據域的靈活使用。但在實時場景下,這一類的維護收益通常都比較低,不僅因為實時只看當天的數據,也是因為實時本身的維度難度就較大,多一層模型其收益會急速下降,因而大多數時候會忽略掉DWS的建設,ADS直接引用DWD進行統計。
然而,DWS畢竟存儲的內容要比DWD少很多,因此如果計算資源瓶頸非常明顯,或者是業務場景不需要分析實時明細數據的情況下,或者是DWD的下游引用過多時,DWS可以承擔削峰的重任,通過減少數據量以應對大促等場景,還是有一定意義的。
接下來就是最重要的ADS層,在這一層上,邏輯統一、口徑統一、大促削峰在前置模型上都得到了一定程度的解決,ADS則像離線一樣承擔了應對需求變化的重任。
但ADS所面臨的情況和離線還是有所不同的,因為ADS的任務啟動,不僅要啟動一個離線的跑批任務,還要同時啟動一個實時的流式任務,而ADS往往會同時統計離線+實時的結果,以應對同比、環比等場景。
這時候很多具體Case要具體分析了,因為特定場景的坑會非常多。例如最常見的「同比」,要對比今年和去年的結果變化,離線往往會統計分小時的結果,但實時會累計起始時刻到當期時刻的結果,因而當一個小時沒有結束的時候,這個同比的波動變化會非常大,給人一種「數據是錯誤的」印象,新手很容易踩這個坑,從而被業務質疑。
因此,針對累計統計指標,從代碼設計上就要考慮到這種情況,都根據時間欄位統計起始到當前時刻的結果的,在代碼邏輯上會要求一些統計技巧。
很多時候,因為業務指標變化太快,改實時代碼是來不及的,這時候一部分的工作量甚至需要報表工具的數據集來解決,改動查詢sql,要比改動任務來的快捷多了。但這部分的能力,其實是依賴於存儲工具的,個人認為可以分到存儲統一層來解決。
最後是存儲統一層,因為一些特殊的場景,比如實時分析明細數據,或者是不確定時間周期的多天統計結果,如果依賴Flink SQL來解決是有些不現實的,因而這部分的壓力需要資料庫來承擔。
簡單講,就是將明細做輕度的匯總後,直接寫到資料庫,實時更新,下游自定義條件,並直接讀庫統計結果。這種場景既要求資料庫有OLAP的計算能力,也要有OLTP的穩定特點,因而TiDB和Holo這一類HTAP的引擎就變得非常重要。
因為多了實時的部分,因此過去面向離線的開發工具,也需要有一些特定的改造,以適應實時的開發和運維訴求。
對於開發工具而言,其目標集中在四個場景上:元數據定義與獲取、數據建模、開發與測試、運維與監控。
其次講數據建模,因為建模的理論已經穩定了有些年頭了,絕大多數場景下都是按照既定的方案來執行。過去離線當道時,規范執行的弱一些不是什麼大問題,但流批一體當道的年代,規范是需要強約束的,這就對了開發工具提出了一定的要求,是否能夠從平台層面上對規范進行內置,並以此來約束開發的同學,降低不規范模型對後期維護帶來的壓力。
這種建模能力的代表有兩種,一種是規范表的命名,填寫相應的分層+主題域+數據域+統計刷新方式,從源頭上規范表的目標和作用;一種是規范指標的定義和使用,例如原子指標還是派生指標,統計周期多少,業務限定用語如何規范,統計粒度怎麼填寫。
在實際開發中,通過工具的限制,如果規范可以做的好,代碼是可以自動生成出來的。當然,以上的功能,都屬於通過犧牲開發效率,來提升數據質量的范疇,使用時需要根據團隊的情況來限定。
再次是開發和測試,這是平台提供的最重要的能力。在開發層面,就是代碼的預編譯能力+發布功能。預編譯不僅要檢查代碼的邏輯是否正確,同時對於代碼中依賴的其他數據源,獲取到的元數據信息是否准確,至少欄位的命名不會有大的問題。當代碼預編譯通過,發布上線後,還需要檢測當前是否有資源支持任務啟動,並且上游的消息隊列是否是啟動的狀態。
實時的測試一直都是比較大的問題,它不像離線可以啟動一個SQL任務看看結果,實時在每個階段的輸入和輸出,是需要通過平台支持的日誌列印功能來進行輔助的。很多時候我們會新建一個測試專用的topic來測試結果,但對於流量較大的線上任務而言,這種方式無法像離線區分Dev環境一樣,能夠對資源進行隔離,因而如果能夠支持圈定數據的輸入和列印輸出,對於測試的效率而言無疑是最佳的。
最後要提到的是運維與監控能力。運維能力是指根據輸入的RPS,或者是cu使用情況,或者是任務的整體延遲,提供相應的參數調優能力,通過參數來調整任務的執行情況。並且能夠根據以上指標的變化,自定義相應的閾值,提供相應的告警能力,通過簡訊或者是消息工具的方式觸達任務維護者。
實時與離線有一些不同的是,離線可以通過增加一個監控節點的方式,通過group by判斷數據是否重復,而實時任務則非常依賴Flink自身的一致性能力,因而發現和解決問題的成本更高。
其實做到運維這個環節,對人的要求其實是更高的。因為流批一體在運維上會帶來一個好處,即實時任務和離線任務能夠錯峰執行,實時在白天壓力大,而離線在晚上壓力大。但同樣的,這種方式對於維護者而言更加痛苦,因為不僅晚上要熬夜值班,白天同樣不能休息,在大促期間甚至需要輪班來維護任務,可以說是「匯報一時爽,痛苦長相伴」。
從遠處來看,流任務和批任務,在自身的機制上就存在非常大的差異,批程序面上的是特定時間內相對靜態的數據,而流程序處理的則是change-log,雖然有可能數據在表結構層面,通過數據模型的設計來保持一致,但是在語義層面,其根本還是不一樣的。這一點可能是最制約批流一體發展的問題,也是最難實現統一或者永遠也不可能統一的。
綜上,對於實時模型,開發工具需要將監控實時部分的能力進行補全,就像DWD層需要分別維護實時和離線兩套架構一樣,開發工具也需要分別維護兩套架構的結果,因而現階段的實時開發,還做不到降低維護和開發的成本,只能減輕其中部分環節的工作量。
以上講了很長時間的實時模型,但從實際的效果上看,業務並不會感知到多麼明顯的技術變化,相反會有一種「面子工程」的感覺在裡面。
當然,我並不否認實時的價值,在「搜廣推」這三個技術佔主導的領域內,作用還是很大的。但實時畢竟要比離線的內容,更加的難以理解,出現問題的排查成本也更高。這種復雜性使得我們在應對變化時,往往做不出有效的應對,就會變得特別被動。
因而,說一句事後的話,就是「實時的價值取決於業務方,而不是技術方」。只有業務對實時痛點強烈的場景下,我們做如此復雜的研究和應對,才能體現出自己的價值,更多的時候,是在「王婆賣瓜,自賣自誇」。有這種投入,還不如多招幾個分析師更靠譜和實在。
本人之前的文章《天下數據,唯快不破》,重點強調了一個「快」字。但「天下熙熙皆為利來,天下攘攘皆為利往」,這個快更多的是在講應對「變化」的快,而不是「技術」自己的快。
所以,為了以後的職業發展,我們要跟進實時技術的變化,但從自身的工作角度出發,如何應對業務的變化,才是自己要關心的課題。
❽ 基於Flink的實時計算平台的構建
一、系統架構
1. 接入層
Canal、Flume、Kafka
針對業務系統數據,Canal監控Binlog日誌,發送至kafka;
針對日誌數據,由Flume來進行統一收集,並發送至kafka。
消息隊列的數據既是離線數倉的原始數據,也是實時計算的原始數據,這樣可以保證實時和離線的原始數據是統一的。
2. 計算層
Flink
有了源數據,在 計算層 經過Flink實時計算引擎做一些加工處理,然後落地到存儲層中不同存儲介質當中。
3. 存儲層
HBase、Kafka、ES、Mysql、Hive、Redis
不同的 存儲介質 是通過不同的應用場景來選擇。
4. 數據應用層
風控、模型、圖譜、大屏展示
通過存儲層應用於不同的 數據應用 ,數據應用可能是我們的正式產品或者直接的業務系統
二、技術實現
1. 計算引擎
實時計算引擎的功能要求
提供高級 API,支持常見的數據操作比如關聯聚合,最好是能支持 SQL
具有狀態管理和自動支持久化方案,減少對存儲的依賴
可靠的容錯機制,低延時,最好能夠保證Exactly-once
Flink的優勢
Flink的API、容錯機制與狀態管理都滿足實時數倉計算引擎的需求
Flink高吞吐、低延時的特性
端到端的Exactly-once
WaterMark&Event Time的支持
Flink 不僅支持了大量常用的 SQL 語句,還有豐富的數據類型、內置函數以及靈活的自定義函數,基本覆蓋了我們的開發場景
2. 存儲引擎
根據不同的業務場景,使用最適合的存儲引擎:
Kafka主要用於中間數據表的存儲
ES主要針對日誌數據的存儲和分析
HBase、Redis可用於維表存儲
Hive用於數據校驗
Mysql可以用於指標計算結果的存儲
三、數據分層
數據源:目前數據源主要是Binlog,通過Canal監控各個業務系統的Mysql,將binlog發送至kafka。
ODS層:主要將Binlog數據存儲至Kafka,這一層不對數據進行任何操作,存儲最原始的數據,Binlog 日誌在這一層為庫級別,即:一個庫的變更數據存放在同一個 Kafka Topic 中。
DWD層:主要對數據進行簡單的清洗。拆分主題,將庫級別的主題拆分為表級別;打平數據,將data數組格式打平。
DWS層:主要根據不同的業務的需求,將該需求所涉及到的表進行join所得。
APP層:根據指標計算需求,對數據進行處理後,存儲HBase,為了方便模型查詢,主要將表存儲為索引表和明細表,直接對數據進行指標計算後,將計算結果存儲到HBase。
四、數據監控及校驗
1. 數據監控
目前數據的監控的架構是pushgateway + Prometheus + Grafana
數據監控主要是接入Flink的Metric,通過Grafana對Flink系統指標及自定義指標進行圖形化界面的展示,對關鍵指標進行監控報警
2. 數據校驗
目前數據的監控的架構是Grafana + Mysql
Grafana用於監控指標的展示及相關閾值數據的報警,Mysql主要用於監控數據的存儲
將每個服務的source收到的數據、sink發出的數據,根據表的不同將數據關鍵欄位寫入mysql中,通過統計各個階段各個表中的數據條數,對數據完整性進行監控校驗,若出現數據缺時,先查找原因,然後指定時間戳重啟服務
五、系統管理
元數據管理
表,欄位元數據管理,實時感知元數據的變化,大幅度降低使用數據的成本。
系統配置
對應用啟動參數及相關配置參數的管理,對任務進行靈活配置及管理。
血緣管理
主要是梳理實時計算平台中數據依賴關系,以及實時任務的依賴關系,從底層ODS到DWD再到DWS,以及APP層用到哪些數據,將整個鏈度串聯起來。
六、問題及解決方案
1. 數據傾斜
由於要拆分主題,要以table為key對數據進行keyBy,但是由於每個表的數據量相差較大,會出現數據傾斜
解決方案:
加鹽,給key加前綴
前綴不能隨便加,為了保證同一id的數據在相同的分區中,所以根據id_table進行keyBy
2. 數據重復
任務在進行自動或手動重啟時,為了保證數據不丟失,數據會出現重復計算的問題,如果下游只是對數據進行HBase存儲的話,由於冪等性,這種重復可以解。但是,如果下游要對數據進行聚合,這樣會導致數據被計算多次,影響計算結果的准確性
解決方案:
上游在對數據進行發送時,對kafka procer 進行 exactly once的設置
在對數據統計時進行數據去重
3. 數據延時
由於所處理的數據表的大小不一樣,處理大表時,會出現數據延時的問題。
解決方案:
針對大表數據增加並行度
4.數據亂序
由於Flink kafka procer默認是根據hash對數據進行隨機分區,kafka consumer在對數據進行消費時,每個分區消費速度不同,這樣最終在存儲數據時,就會出現亂序即相同的id會出現老數據覆蓋新數據的問題
解決方案:
對kafka每個階段進行自定義分區,將id相同的數據分到同一個分區,保證同一id的數據的有序性
由於整個數據處理過程中可能會出現shuffle,導數數據重新亂序,所以在對數據存儲前對數據進行排序
對數據進行排序的關鍵點時要保證每條數據的唯一性,即要有標記數據先後順序的欄位
5 . 數據唯一標記(很重要)
由於要對數據進行去重或者排序,所以要保證數據的唯一性
解決辦法:
使用時間戳不可以,因為數據量很大的情況下,同一時間會處理上百條數據
在最初發出數據的時候,為數據打上標記,使用 partition + offset + idx 的組合來確認數據的唯一性及順序性
6. 數據可靠性
我們對服務重啟或對服務升級時,可能會出現數據的丟失
解決方案:
結合Flink 的checkpoint及savepoint機制保證數據的可靠性
開啟Flink的checkpoint機制,服務進行自動重啟時,會自動讀取上次保存在checkpoint中offset,或者我們指定offset進行數據消費
對服務進行升級時,先將服務的狀態保存至savepoint中,重啟時指定savepoint進行服務啟動,保證數據不丟失
7. 無感升級
由於我們目前數據量比較龐大,且在對服務進行升級時,耗時較長,會影響調用方的使用。
解決辦法:
在對服務進行升級時,將數據寫入備用庫,等數據追上且服務穩定運行後,再將存儲庫進行切換
❾ flink 讀取mysql並使用flink sql
1.mysql連接
2.flink sql
3.dependency
❿ flink sql 知其所以然(十三):流 join問題解決
本節是 flink sql 流 join 系列的下篇,上篇的鏈接如下:
廢話不多說,咱們先直接上本文的目錄和結論,小夥伴可以先看結論快速了解博主期望本文能給小夥伴們帶來什麼幫助:
書接上文,上文介紹了曝光流在關聯點擊流時,使用 flink sql regular join 存在的 retract 問題。
本文介紹怎麼使用 flink sql interval join 解決這些問題。
flink sql 知其所以然(十二):流 join 很難嘛???(上)
看看上節的實際案例,來看看在具體輸入值的場景下,輸出值應該長啥樣。
場景:即常見的曝光日誌流(show_log)通過 log_id 關聯點擊日誌流(click_log),將數據的關聯結果進行下發。
來一波輸入數據:
曝光數據:
點擊數據:
預期輸出數據如下:
上節的 flink sql regular join 解決方案如下:
上節說道,flink sql left join 在流數據到達時,如果左表流(show_log)join 不到右表流(click_log) ,則不會等待右流直接輸出(show_log,null),在後續右表流數據代打時,會將(show_log,null)撤回,發送(show_log,click_log)。這就是為什麼產生了 retract 流,從而導致重復寫入 kafka。
對此,我們也是提出了對應的解決思路,既然 left join 中左流不會等待右流,那麼能不能讓左流強行等待右流一段時間,實在等不到在數據關聯不到的數據即可。
當當當!!!
本文的 flink sql interval join 登場,它就能等。
大家先通過下面這句話和圖簡單了解一下 interval join 的作用(熟悉 DataStream 的小夥伴萌可能已經使用過了),後續會詳細介紹原理。
interval join 就是用一個流的數據去關聯另一個流的一段時間區間內的數據。關聯到就下發關聯到的數據,關聯不到且在超時後就根據是否是 outer join(left join,right join,full join)下發沒關聯到的數據。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>
來看看上述案例的 flink sql interval join sql 怎麼寫:
這里設置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE 代表 show_log 表中的數據會和 click_log 表中的 row_time 在前後 10 分鍾之內的數據進行關聯。
運行結果如下:
如上就是我們期望的正確結果了。
flink web ui 運算元圖如下:
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>
那麼此時你可能有一個問題,結果中的前兩條數據 join 到了輸出我是理解的,那當 show_log join 不到 click_log 時為啥也輸出了?原理是啥?
博主帶你們來定位到具體的實現源碼。先看一下 transformations。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>
可以看到事件時間下 interval join 的具體 operator 是 org.apache.flink.table.runtime.operators.join. 。
其核心邏輯就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 來處理具體 join 邏輯。 RowTimeIntervalJoin 重要方法如下圖所示。
TimeIntervalJoin
下面詳細給大家解釋一下。
join 時,左流和右流會在 interval 時間之內相互等待,如果等到了則輸出數據[+(show_log,click_log)],如果等不到,並且另一條流的時間已經推進到當前這條數據在也不可能 join 到另一條流的數據時,則直接輸出[+(show_log,null)],[+(null,click_log)]。
舉個例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE , 當 click_log 的時間推進到 2021-11-01 11:00:00 時,這時 show_log 來一條 2021-11-01 02:00:00 的數據, 那這條 show_log 必然不可能和 click_log 中的數據 join 到了,因為 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之間的數據以及過期刪除了。則 show_log 直接輸出 [+(show_log,null)]
以上面案例的 show_log(左表) interval join click_log(右表) 為例(不管是 inner interval join,left interval join,right interval join 還是 full interval join,都會按照下面的流程執行):
上面只是左流 show_log 數據到達時的執行流程(即 ProcessElement1 ),當右流 click_log 到達時也是完全類似的執行流程(即 ProcessElement2 )。
小夥伴萌在使用 interval join 需要注意的兩點事項:
本文主要介紹了 flink sql interval 是怎麼避免出現 flink regular join 存在的 retract 問題的,並通過解析其實現說明了運行原理,博主期望你讀完本文之後能了解到: