Ⅰ Flink sql Query 語法(一)
SELECT 語句和 VALUES 語句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定,會以 Table 的形式返回 SELECT (或 VALUE)吵桐迅的查詢結果。Table 可被用於 SQL 或 Table API 查詢、轉換為 DataSet 或 DataStream、輸出到 TableSink。SQL 與 Table API 的查詢可以進行無縫融合、整體優化。
為了可以在 SQL 查詢中訪問到表,需要先在 TableEnvironment 中 注冊表 (可以通過 TableSource、Table、CREATE TABLE 語句、DataStream 或 DataSet 注冊)。為方便起見 Table.toString() 將會在其 TableEnvironment 中以唯一的名稱自動注冊表,並返回名稱。
注意: 查詢若包括了不支持的 SQL 特性,將會拋出 TableException 。
以下示例顯示如何在已注冊和內聯表上指定 SQL 查詢。
SELECT 語句或者 VALUES 語句可以通過 TableEnvironment.executeSql() 方法來執行,該方法返回 TableResult 對象用於包裝查詢的結果,一個 Table 對象可以通過 Table.execute() 方法執行獲取查詢結果。 TableResult.collect() 方法返回一個可以關閉升此的行迭代器(除非所有的數據都被收集到本地,否則一個查詢作業永遠不會結束。所以通過 CloseableIterator#close() 方法主動地關閉作業以防止資源泄露)。 還可以通過 TableResult.print() 方法將查詢結果列印到控制台。TableResult 中的結果數據只能被訪問一次,因此一個 TableResult 實例中, collect() 方法和 print() 方法不能被同時使用。
TableResult.collect() 與 TableResult.print() 的行為在不同的 checkpointing 模式下略有不同。
Flink 通過支持標准 ANSI SQL的 Apache Calcite 解析 SQL。以下「BNF-語法」描述了批處理和流處理查詢中所支持的 SQL 特性的超集。
Flink SQL 對於標識符(表、屬性、函數名)的命名策略類似於 Java 的詞法約定:
字元串文本常量需要被單引號包起來(如 SELECT 'Hello World' )。兩個單引號表示轉義(如 SELECT 'It''s me.' )。字元串文本常量支持 Unicode 字元,如需明確使用 Unicode 編碼,請使用以下語法:
WITH 提供了編寫輔助語句的方法,以便在更大的查詢中使用。這些語句通常被稱為公共表表達式(Common Table Expression,CTE),可以認為它定義了只存在於一個查詢中的臨時視圖。
WITH 語法:
下面的示例定義了一個 CTE: orders_with_total ,並在 GROUP BY 查詢中使用它。
SELECT 語句的一般語法為:
table_expression 可以是任何數據源(表、視圖、VALUES 子句、多個表的 Join 結果、子查詢)。下面的事例讀取 Orders 表的所有列:
select_list 指定 * 表示解析所有的列,但是不建議在生產環境中使用,會降低性能,建議輪跡只查詢需要的列:
查詢可以使用 VALUES 子句,每個元組(Tuple)對應一個 Row,並且可以設置別名:
WHERE 語句可以過濾 Row:
可以對每行數據的指定列調用函數(內置、自定義函數,自定義函數必須提前注冊):
如果指定 SELECT DISTINCT,則將從結果集中刪除重復行(每組重復中保留一行)。
對於流式查詢,計算查詢結果所需的狀態(State)可能會無限增長。狀態大小取決於不同行的數量。可<u>以為查詢配置適當的狀態生存時間(TTL),以防止狀態大小過大。這可能會影響查詢結果的正確性</u>。
Window 是流處理的核心。Windows 將流拆分為有限大小的片段應用計算。只有流處理支持。
Flink 1.13 提供了幾個 Table-valued functions(TVF,區別於 Group Window Function),將表中的元素劃分為 windows,包括:
- 滾動窗口(Tumbling windows)
- 滑動窗口(Hop, Sliding windows)
- 累加窗口(Cumulate windows)
- 會話窗口(Session windows,TVF 暫不支持)
每個元素在邏輯上可以屬於多個窗口,具體取決於所使用的窗口函數。TVF 必須和聚合操作一起使用:
假設存在一個 Bid 表
指定一個固定大小的窗口,並且不重疊,語法:
設定一個10分鍾大小的滾動窗口,
指定一個固定大小的窗口,設定滑動間隔,元素會被指定給多個窗口,語法:
設定一個10分鍾大小,每5分鍾滑動的窗口,
指定一個窗口的最大規模,按照指定時間間隔增長累加,直到達到窗口的最大規模,每次窗口增長會進行一次計算,可以理解為多次計算的滾動窗口,語法:
設定一個10分鍾大小,每2分鍾累計一次的窗口,
Ⅱ Flink SQL 知其所以然(五)| 自定義 protobuf format
protobuf 作為目前各大公司中最廣泛使用的高效的協議數據交換格式工具庫,會大量作為流式數據傳輸的序列化方式,所以在 flink sql 中如果能實現 protobuf 的 format 會非常有用( 目前社區已經有對應的實現,不過目前還沒有 merge,預計在 1.14 系列版本中能 release )。
issue 見: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC
pr 見: https://github.com/apache/flink/pull/14376
這一節主要介紹 flink sql 中怎麼自定義實現 format ,其中以最常使用的 protobuf 作為案例來介紹。
如果想在本地直接測試下:
關於為什麼選擇 protobuf 可以看這篇文章,寫的很詳細:
http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral
在實時計算的領域中,為了可讀性會選擇 json ,為了效率以及一些已經依賴了 grpc 的公司會選擇 protobuf 來做數據序列化,那麼自然而然,日誌的序列化方式也會選擇 protobuf 。
而官方目前已經 release 的版本中是沒有提供 flink sql api 的 protobuf format 的。如下圖,基於 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
因此本文在介紹怎樣自定義一個 format 的同時,實現一個 protobuf format 來給大家使用。
預期效果是先實現幾種最基本的數據類型,包括 protobuf 中的 message (自定義 model)、 map (映射)、 repeated (列表)、其他基本數據類型等,這些都是我們最常使用的類型。
預期 protobuf message 定義如下:
測試數據源數據如下,博主把 protobuf 的數據轉換為 json,以方便展示,如下圖:
預期 flink sql:
數據源表 DDL:
數據匯表 DDL:
Transform 執行邏輯:
下面是我在本地跑的結果:
可以看到列印的結果,數據是正確的被反序列化讀入,並且最終輸出到 console。
目前業界可以參考的實現如下: https://github.com/maosuhan/flink-pb , 也就是這位哥們負責目前 flink protobuf 的 format。
這種實現的具體使用方式如下:
其實現有幾個特點:
[圖片上傳失敗...(image-66c35b-1644940704671)]
其實上節已經詳細描述了 flink sql 對於 sourcesinkformat 的載入機制。
如圖 serde format 是通過 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 創建的
所有通過 SPI 的 sourcesinkformt 插件都繼承自 Factory 。
整體創建 format 方法的調用鏈如下圖。
最終實現如下,涉及到了幾個實現類:
具體流程:
上述實現類的具體關系如下:
介紹完流程,進入具體實現方案細節:
ProtobufFormatFactory 主要創建 format 的邏輯:
resourcesMETA-INF 文件:
主要實現反序列化的邏輯:
可以注意到上述反序列化的主要邏輯就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定義的。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 其實就是一個 convertor 介面:
其作用就是將 protobuf message 中的每一個欄位轉換成為 RowData 中的每一個欄位。
ProtobufToRowDataConverters 中就定義了具體轉換邏輯,如截圖所示,每一個 LogicalType 都定義了 protobuf message 欄位轉換為 flink 數據類型的邏輯:
源碼後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取。
本文主要是針對 flink sql protobuf format 進行了原理解釋以及對應的實現。
如果你正好需要這么一個 format,直接後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取源碼吧。
當然上述只是 protobuf format 一個基礎的實現,用於生產環境還有很多方面可以去擴展的。
Ⅲ flink1.12 sql向redis實時寫數據
基於 bahir-flink 二次開發,使它支持SQL直接定義寫入redis,用戶通過DDL指定自己需要保存的欄位。
命令行執行 mvn package -DskipTests=true打包後,將生成的包flink-connector-redis_2.12-1.11.1.jar引入flink lib中即可,無需其它設置。
相對上一個版本簡化了參數設置,思路更清晰,上一版本欄位的值會根據主鍵等條件來自動生成,這要求使用者需要了解相關規則,有一定的學習成本並且容易埋坑,重構後欄位的值由用戶在DDL中顯示地指定,如下:
取消了必須有主鍵的限制,使用更簡單,如果有多個欄位組合成key或者value,需要用戶在DML中使用concat_ws等方式組裝,不再是插件在後台用不可見字元拼裝。
Ⅳ flink開窗函數
FlinkSQL
窗口:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
SESSION(<time-attr>, <gap-interval>)
<gap-interval>: INTERVAL 'string' timeUnit
累積窗口函數:飢漏CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
窗口分組聚合GROUPING SETS
over函數
CUBE
Flink DataStream
增量聚合和全量聚合
增量聚合: 窗此肢行口不維護原始數據,森嘩只維護中間結果,每次基於中間結果和增量數據進行聚合。
如: ReceFunction、AggregateFunction
全量聚合: 窗口需要維護全部原始數據,窗口觸發進行全量聚合。如:ProcessWindowFunction
Ⅳ flink1.12.1擴展flink-sql 支持寫入到sqlserver
目前業務上有同步數據到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,
科學上網後發現袋鼠雲的flinkStreamSql已經有支持sqlserver,那就開始動手,參考實現一波
主要實現getUpsertStatement的方法,本來以為能直接一波flinkStreamSql 的實現,結果發現
報錯 SQL statement must not contain ? character.
查看源碼發現, flink在構建mysql的Statement,是先把需要替換的欄位前面拼接了 : fieldNames,然後在org.apache.flink.connector.jdbc.statement.類的parseNamedStatement 替換成 ?號, 既然如此,就針對了buildDualQueryStatement進行修改
完整的SqlServerDialect文件
最後替換原有的flink-jar包後,就可以用類似flink定義mysql的ddl進行定義表了
注意url寫法為:jdbc:jtds:sqlserver://xxx:1433;databaseName=master;
[flinkStreamSQL鏈接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java
Ⅵ flink sql 知其所以然(十三):流 join問題解決
本節是 flink sql 流 join 系列的下篇,上篇的鏈接如下:
廢話不多說,咱們先直接上本文的目錄和結論,小夥伴可以先看結論快速了解博主期望本文能給小夥伴們帶來什麼幫助:
書接上文,上文介紹了曝光流在關聯點擊流時,使用 flink sql regular join 存在的 retract 問題。
本文介紹怎麼使用 flink sql interval join 解決這些問題。
flink sql 知其所以然(十二):流 join 很難嘛???(上)
看看上節的實際案例,來看看在具體輸入值的場景下,輸出值應該長啥樣。
場景:即常見的曝光日誌流(show_log)通過 log_id 關聯點擊日誌流(click_log),將數據的關聯結果進行下發。
來一波輸入數據:
曝光數據:
點擊數據:
預期輸出數據如下:
上節的 flink sql regular join 解決方案如下:
上節說道,flink sql left join 在流數據到達時,如果左表流(show_log)join 不到右表流(click_log) ,則不會等待右流直接輸出(show_log,null),在後續右表流數據代打時,會將(show_log,null)撤回,發送(show_log,click_log)。這就是為什麼產生了 retract 流,從而導致重復寫入 kafka。
對此,我們也是提出了對應的解決思路,既然 left join 中左流不會等待右流,那麼能不能讓左流強行等待右流一段時間,實在等不到在數據關聯不到的數據即可。
當當當!!!
本文的 flink sql interval join 登場,它就能等。
大家先通過下面這句話和圖簡單了解一下 interval join 的作用(熟悉 DataStream 的小夥伴萌可能已經使用過了),後續會詳細介紹原理。
interval join 就是用一個流的數據去關聯另一個流的一段時間區間內的數據。關聯到就下發關聯到的數據,關聯不到且在超時後就根據是否是 outer join(left join,right join,full join)下發沒關聯到的數據。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>
來看看上述案例的 flink sql interval join sql 怎麼寫:
這里設置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE 代表 show_log 表中的數據會和 click_log 表中的 row_time 在前後 10 分鍾之內的數據進行關聯。
運行結果如下:
如上就是我們期望的正確結果了。
flink web ui 運算元圖如下:
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>
那麼此時你可能有一個問題,結果中的前兩條數據 join 到了輸出我是理解的,那當 show_log join 不到 click_log 時為啥也輸出了?原理是啥?
博主帶你們來定位到具體的實現源碼。先看一下 transformations。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>
可以看到事件時間下 interval join 的具體 operator 是 org.apache.flink.table.runtime.operators.join. 。
其核心邏輯就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 來處理具體 join 邏輯。 RowTimeIntervalJoin 重要方法如下圖所示。
TimeIntervalJoin
下面詳細給大家解釋一下。
join 時,左流和右流會在 interval 時間之內相互等待,如果等到了則輸出數據[+(show_log,click_log)],如果等不到,並且另一條流的時間已經推進到當前這條數據在也不可能 join 到另一條流的數據時,則直接輸出[+(show_log,null)],[+(null,click_log)]。
舉個例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE , 當 click_log 的時間推進到 2021-11-01 11:00:00 時,這時 show_log 來一條 2021-11-01 02:00:00 的數據, 那這條 show_log 必然不可能和 click_log 中的數據 join 到了,因為 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之間的數據以及過期刪除了。則 show_log 直接輸出 [+(show_log,null)]
以上面案例的 show_log(左表) interval join click_log(右表) 為例(不管是 inner interval join,left interval join,right interval join 還是 full interval join,都會按照下面的流程執行):
上面只是左流 show_log 數據到達時的執行流程(即 ProcessElement1 ),當右流 click_log 到達時也是完全類似的執行流程(即 ProcessElement2 )。
小夥伴萌在使用 interval join 需要注意的兩點事項:
本文主要介紹了 flink sql interval 是怎麼避免出現 flink regular join 存在的 retract 問題的,並通過解析其實現說明了運行原理,博主期望你讀完本文之後能了解到:
Ⅶ Flink SQL實戰演練之自定義Clickhouse Connector
簡介:實時數倉目前的架構是flink+clickhouse,社區目前jdbc connector不支持clickhouse的方言,所以決定自定義clickhouse connector實現flink sql寫入數據到clickhouse。
目前想要實現flink sql數據落地到ck,可以修改jdbc connector的源碼,增加ck方言,或者採用阿里提供的ck connector包,為了更好的理解flink connector的原理,這里自定義connector實現。
目前支持Flink寫入Clickhouse的依賴哭比較多,如果數據格式固定,可以CSV的方式寫入,如果不固定,可採用Json的方式寫入。
Ⅷ FlinkX快速開始(一)
鏈接地址:胡稿纖 FlinkX
FlinkX是一個基於Flink的批流統一的數據同步工具,既可以採集靜態的數據,比如MySQL,HDFS等,也可以採集實時變化的數據,比如MySQL binlog,Kafka等
1、前置
需要安裝maven、java8、配置好github相關參數
2、Fork FlinX項目到自己的倉庫中
2、Clone項目到本地
git clone https://github.com/liukunyuan/flinkx.git
3、安裝額外的jar包
1)、cd flinkx/bin
2)、執行sh ./install_jars.sh(windows執行install_jars.bat腳本)
4、打包
1)、回到flinkx目錄:cd ..
2)、執行褲仿打包命令敬肢:mvn clean package -Dmaven.test.skip=true
1、配置flink conf文件(暫時不需要安裝flink)
1)、進入flinkconf目錄
cd flinkconf
2)、修改flink-conf.yaml文件添加一行
rest.bind-port: 8888
2、配置mysqltomysql的json文件,路徑:/Users/jack/Documents/jack-project/flinkx/flinkconf/mysql2mysql.json
3、運行任務
4、查看監控網頁和log.txt文件: http://localhost:8888/
Ⅸ Flink1.13 SQL執行 oom 排查
flink on yarn cluster的模式, yarn上的應用經常發生異常, 如jobmanager的oom, zk心跳丟失, slot分配請求超時, hdfs文件已存在等等; 經過排查定位到了是flink sql的解析問題, 像count, where這類的春高圓語句在實際執行的時候變成了全量的查詢
分析mp文件, 得知扒塌內存中存放了該表幾乎全量的數據, 但sql加上where條件後, 實際上數據只有10來條, 是create table階段的問題, 還念春是sql執行階段的問題呢?