⑴ Flink sql Query 語法(一)
SELECT 語句和 VALUES 語句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定,會以 Table 的形式返回 SELECT (或 VALUE)吵桐迅的查詢結果。Table 可被用於 SQL 或 Table API 查詢、轉換為 DataSet 或 DataStream、輸出到 TableSink。SQL 與 Table API 的查詢可以進行無縫融合、整體優化。
為了可以在 SQL 查詢中訪問到表,需要先在 TableEnvironment 中 注冊表 (可以通過 TableSource、Table、CREATE TABLE 語句、DataStream 或 DataSet 注冊)。為方便起見 Table.toString() 將會在其 TableEnvironment 中以唯一的名稱自動注冊表,並返回名稱。
注意: 查詢若包括了不支持的 SQL 特性,將會拋出 TableException 。
以下示例顯示如何在已注冊和內聯表上指定 SQL 查詢。
SELECT 語句或者 VALUES 語句可以通過 TableEnvironment.executeSql() 方法來執行,該方法返回 TableResult 對象用於包裝查詢的結果,一個 Table 對象可以通過 Table.execute() 方法執行獲取查詢結果。 TableResult.collect() 方法返回一個可以關閉升此的行迭代器(除非所有的數據都被收集到本地,否則一個查詢作業永遠不會結束。所以通過 CloseableIterator#close() 方法主動地關閉作業以防止資源泄露)。 還可以通過 TableResult.print() 方法將查詢結果列印到控制台。TableResult 中的結果數據只能被訪問一次,因此一個 TableResult 實例中, collect() 方法和 print() 方法不能被同時使用。
TableResult.collect() 與 TableResult.print() 的行為在不同的 checkpointing 模式下略有不同。
Flink 通過支持標准 ANSI SQL的 Apache Calcite 解析 SQL。以下「BNF-語法」描述了批處理和流處理查詢中所支持的 SQL 特性的超集。
Flink SQL 對於標識符(表、屬性、函數名)的命名策略類似於 Java 的詞法約定:
字元串文本常量需要被單引號包起來(如 SELECT 'Hello World' )。兩個單引號表示轉義(如 SELECT 'It''s me.' )。字元串文本常量支持 Unicode 字元,如需明確使用 Unicode 編碼,請使用以下語法:
WITH 提供了編寫輔助語句的方法,以便在更大的查詢中使用。這些語句通常被稱為公共表表達式(Common Table Expression,CTE),可以認為它定義了只存在於一個查詢中的臨時視圖。
WITH 語法:
下面的示例定義了一個 CTE: orders_with_total ,並在 GROUP BY 查詢中使用它。
SELECT 語句的一般語法為:
table_expression 可以是任何數據源(表、視圖、VALUES 子句、多個表的 Join 結果、子查詢)。下面的事例讀取 Orders 表的所有列:
select_list 指定 * 表示解析所有的列,但是不建議在生產環境中使用,會降低性能,建議輪跡只查詢需要的列:
查詢可以使用 VALUES 子句,每個元組(Tuple)對應一個 Row,並且可以設置別名:
WHERE 語句可以過濾 Row:
可以對每行數據的指定列調用函數(內置、自定義函數,自定義函數必須提前注冊):
如果指定 SELECT DISTINCT,則將從結果集中刪除重復行(每組重復中保留一行)。
對於流式查詢,計算查詢結果所需的狀態(State)可能會無限增長。狀態大小取決於不同行的數量。可<u>以為查詢配置適當的狀態生存時間(TTL),以防止狀態大小過大。這可能會影響查詢結果的正確性</u>。
Window 是流處理的核心。Windows 將流拆分為有限大小的片段應用計算。只有流處理支持。
Flink 1.13 提供了幾個 Table-valued functions(TVF,區別於 Group Window Function),將表中的元素劃分為 windows,包括:
- 滾動窗口(Tumbling windows)
- 滑動窗口(Hop, Sliding windows)
- 累加窗口(Cumulate windows)
- 會話窗口(Session windows,TVF 暫不支持)
每個元素在邏輯上可以屬於多個窗口,具體取決於所使用的窗口函數。TVF 必須和聚合操作一起使用:
假設存在一個 Bid 表
指定一個固定大小的窗口,並且不重疊,語法:
設定一個10分鍾大小的滾動窗口,
指定一個固定大小的窗口,設定滑動間隔,元素會被指定給多個窗口,語法:
設定一個10分鍾大小,每5分鍾滑動的窗口,
指定一個窗口的最大規模,按照指定時間間隔增長累加,直到達到窗口的最大規模,每次窗口增長會進行一次計算,可以理解為多次計算的滾動窗口,語法:
設定一個10分鍾大小,每2分鍾累計一次的窗口,
⑵ flinksql-core-動態表
普通動態表是FlinkSQL中的一類表,表中的數據與連接的外部數據對等,可以簡單理解為把一張mysql的表放進flink內存中得到的表,並且該表與mysql表有連接關系,即該表可以讀寫mysql表。
需要聲明表的欄位定義和表屬性(連接器屬性)。語法如下:
with關鍵字前面的是欄位定義,with關鍵字後面的是表屬性。其中欄位定義時還可以聲明表主鍵,聲明語法為PARIMARY KEY(myColumn1,...) NOT ENFORCED, 這里的not enforced表示flinksql不會對主鍵做強制的唯一性約束、非空約束,而且目前flinksql中只支持這種類型的主鍵。
表屬性中有若干個屬性欄位需要聲明,具體有哪些屬性欄位取決於使用哪個連接器,如上述聲明中使用的是jdbc連接器,在使用該連接器時需要提供url、username、password等屬性,通過此連接器我們就可以讓該表能連接到對應的mysql表。
我們可以查詢flinksql普通動態表的數據,此數據與連接的外部數據是一致的。語法如下:
tips:在運行時,只會載入一次外部數據到flinksql普通動態表。後續外部數據表有更新時,flinksql的普通動態表不會跟著自動更新。
我們可以把數據寫入到flinksql動態表,從而實現寫入數據到外部系統的目的。語法如下:
⑶ flink sql 知其所以然(十三):流 join問題解決
本節是 flink sql 流 join 系列的下篇,上篇的鏈接如下:
廢話不多說,咱們先直接上本文的目錄和結論,小夥伴可以先看結論快速了解博主期望本文能給小夥伴們帶來什麼幫助:
書接上文,上文介紹了曝光流在關聯點擊流時,使用 flink sql regular join 存在的 retract 問題。
本文介紹怎麼使用 flink sql interval join 解決這些問題。
flink sql 知其所以然(十二):流 join 很難嘛???(上)
看看上節的實際案例,來看看在具體輸入值的場景下,輸出值應該長啥樣。
場景:即常見的曝光日誌流(show_log)通過 log_id 關聯點擊日誌流(click_log),將數據的關聯結果進行下發。
來一波輸入數據:
曝光數據:
點擊數據:
預期輸出數據如下:
上節的 flink sql regular join 解決方案如下:
上節說道,flink sql left join 在流數據到達時,如果左表流(show_log)join 不到右表流(click_log) ,則不會等待右流直接輸出(show_log,null),在後續右表流數據代打時,會將(show_log,null)撤回,發送(show_log,click_log)。這就是為什麼產生了 retract 流,從而導致重復寫入 kafka。
對此,我們也是提出了對應的解決思路,既然 left join 中左流不會等待右流,那麼能不能讓左流強行等待右流一段時間,實在等不到在數據關聯不到的數據即可。
當當當!!!
本文的 flink sql interval join 登場,它就能等。
大家先通過下面這句話和圖簡單了解一下 interval join 的作用(熟悉 DataStream 的小夥伴萌可能已經使用過了),後續會詳細介紹原理。
interval join 就是用一個流的數據去關聯另一個流的一段時間區間內的數據。關聯到就下發關聯到的數據,關聯不到且在超時後就根據是否是 outer join(left join,right join,full join)下發沒關聯到的數據。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>
來看看上述案例的 flink sql interval join sql 怎麼寫:
這里設置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE 代表 show_log 表中的數據會和 click_log 表中的 row_time 在前後 10 分鍾之內的數據進行關聯。
運行結果如下:
如上就是我們期望的正確結果了。
flink web ui 運算元圖如下:
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>
那麼此時你可能有一個問題,結果中的前兩條數據 join 到了輸出我是理解的,那當 show_log join 不到 click_log 時為啥也輸出了?原理是啥?
博主帶你們來定位到具體的實現源碼。先看一下 transformations。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>
可以看到事件時間下 interval join 的具體 operator 是 org.apache.flink.table.runtime.operators.join. 。
其核心邏輯就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 來處理具體 join 邏輯。 RowTimeIntervalJoin 重要方法如下圖所示。
TimeIntervalJoin
下面詳細給大家解釋一下。
join 時,左流和右流會在 interval 時間之內相互等待,如果等到了則輸出數據[+(show_log,click_log)],如果等不到,並且另一條流的時間已經推進到當前這條數據在也不可能 join 到另一條流的數據時,則直接輸出[+(show_log,null)],[+(null,click_log)]。
舉個例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE , 當 click_log 的時間推進到 2021-11-01 11:00:00 時,這時 show_log 來一條 2021-11-01 02:00:00 的數據, 那這條 show_log 必然不可能和 click_log 中的數據 join 到了,因為 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之間的數據以及過期刪除了。則 show_log 直接輸出 [+(show_log,null)]
以上面案例的 show_log(左表) interval join click_log(右表) 為例(不管是 inner interval join,left interval join,right interval join 還是 full interval join,都會按照下面的流程執行):
上面只是左流 show_log 數據到達時的執行流程(即 ProcessElement1 ),當右流 click_log 到達時也是完全類似的執行流程(即 ProcessElement2 )。
小夥伴萌在使用 interval join 需要注意的兩點事項:
本文主要介紹了 flink sql interval 是怎麼避免出現 flink regular join 存在的 retract 問題的,並通過解析其實現說明了運行原理,博主期望你讀完本文之後能了解到:
⑷ flink1.12.1擴展flink-sql 支持寫入到sqlserver
目前業務上有同步數據到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,
科學上網後發現袋鼠雲的flinkStreamSql已經有支持sqlserver,那就開始動手,參考實現一波
主要實現getUpsertStatement的方法,本來以為能直接一波flinkStreamSql 的實現,結果發現
報錯 SQL statement must not contain ? character.
查看源碼發現, flink在構建mysql的Statement,是先把需要替換的欄位前面拼接了 : fieldNames,然後在org.apache.flink.connector.jdbc.statement.類的parseNamedStatement 替換成 ?號, 既然如此,就針對了buildDualQueryStatement進行修改
完整的SqlServerDialect文件
最後替換原有的flink-jar包後,就可以用類似flink定義mysql的ddl進行定義表了
注意url寫法為:jdbc:jtds:sqlserver://xxx:1433;databaseName=master;
[flinkStreamSQL鏈接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java