⑴ Flink sql Query 语法(一)
SELECT 语句和 VALUES 语句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定,会以 Table 的形式返回 SELECT (或 VALUE)吵桐迅的查询结果。Table 可被用于 SQL 或 Table API 查询、转换为 DataSet 或 DataStream、输出到 TableSink。SQL 与 Table API 的查询可以进行无缝融合、整体优化。
为了可以在 SQL 查询中访问到表,需要先在 TableEnvironment 中 注册表 (可以通过 TableSource、Table、CREATE TABLE 语句、DataStream 或 DataSet 注册)。为方便起见 Table.toString() 将会在其 TableEnvironment 中以唯一的名称自动注册表,并返回名称。
注意: 查询若包括了不支持的 SQL 特性,将会抛出 TableException 。
以下示例显示如何在已注册和内联表上指定 SQL 查询。
SELECT 语句或者 VALUES 语句可以通过 TableEnvironment.executeSql() 方法来执行,该方法返回 TableResult 对象用于包装查询的结果,一个 Table 对象可以通过 Table.execute() 方法执行获取查询结果。 TableResult.collect() 方法返回一个可以关闭升此的行迭代器(除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以通过 CloseableIterator#close() 方法主动地关闭作业以防止资源泄露)。 还可以通过 TableResult.print() 方法将查询结果打印到控制台。TableResult 中的结果数据只能被访问一次,因此一个 TableResult 实例中, collect() 方法和 print() 方法不能被同时使用。
TableResult.collect() 与 TableResult.print() 的行为在不同的 checkpointing 模式下略有不同。
Flink 通过支持标准 ANSI SQL的 Apache Calcite 解析 SQL。以下“BNF-语法”描述了批处理和流处理查询中所支持的 SQL 特性的超集。
Flink SQL 对于标识符(表、属性、函数名)的命名策略类似于 Java 的词法约定:
字符串文本常量需要被单引号包起来(如 SELECT 'Hello World' )。两个单引号表示转义(如 SELECT 'It''s me.' )。字符串文本常量支持 Unicode 字符,如需明确使用 Unicode 编码,请使用以下语法:
WITH 提供了编写辅助语句的方法,以便在更大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression,CTE),可以认为它定义了只存在于一个查询中的临时视图。
WITH 语法:
下面的示例定义了一个 CTE: orders_with_total ,并在 GROUP BY 查询中使用它。
SELECT 语句的一般语法为:
table_expression 可以是任何数据源(表、视图、VALUES 子句、多个表的 Join 结果、子查询)。下面的事例读取 Orders 表的所有列:
select_list 指定 * 表示解析所有的列,但是不建议在生产环境中使用,会降低性能,建议轮迹只查询需要的列:
查询可以使用 VALUES 子句,每个元组(Tuple)对应一个 Row,并且可以设置别名:
WHERE 语句可以过滤 Row:
可以对每行数据的指定列调用函数(内置、自定义函数,自定义函数必须提前注册):
如果指定 SELECT DISTINCT,则将从结果集中删除重复行(每组重复中保留一行)。
对于流式查询,计算查询结果所需的状态(State)可能会无限增长。状态大小取决于不同行的数量。可<u>以为查询配置适当的状态生存时间(TTL),以防止状态大小过大。这可能会影响查询结果的正确性</u>。
Window 是流处理的核心。Windows 将流拆分为有限大小的片段应用计算。只有流处理支持。
Flink 1.13 提供了几个 Table-valued functions(TVF,区别于 Group Window Function),将表中的元素划分为 windows,包括:
- 滚动窗口(Tumbling windows)
- 滑动窗口(Hop, Sliding windows)
- 累加窗口(Cumulate windows)
- 会话窗口(Session windows,TVF 暂不支持)
每个元素在逻辑上可以属于多个窗口,具体取决于所使用的窗口函数。TVF 必须和聚合操作一起使用:
假设存在一个 Bid 表
指定一个固定大小的窗口,并且不重叠,语法:
设定一个10分钟大小的滚动窗口,
指定一个固定大小的窗口,设定滑动间隔,元素会被指定给多个窗口,语法:
设定一个10分钟大小,每5分钟滑动的窗口,
指定一个窗口的最大规模,按照指定时间间隔增长累加,直到达到窗口的最大规模,每次窗口增长会进行一次计算,可以理解为多次计算的滚动窗口,语法:
设定一个10分钟大小,每2分钟累计一次的窗口,
⑵ flinksql-core-动态表
普通动态表是FlinkSQL中的一类表,表中的数据与连接的外部数据对等,可以简单理解为把一张mysql的表放进flink内存中得到的表,并且该表与mysql表有连接关系,即该表可以读写mysql表。
需要声明表的字段定义和表属性(连接器属性)。语法如下:
with关键字前面的是字段定义,with关键字后面的是表属性。其中字段定义时还可以声明表主键,声明语法为PARIMARY KEY(myColumn1,...) NOT ENFORCED, 这里的not enforced表示flinksql不会对主键做强制的唯一性约束、非空约束,而且目前flinksql中只支持这种类型的主键。
表属性中有若干个属性字段需要声明,具体有哪些属性字段取决于使用哪个连接器,如上述声明中使用的是jdbc连接器,在使用该连接器时需要提供url、username、password等属性,通过此连接器我们就可以让该表能连接到对应的mysql表。
我们可以查询flinksql普通动态表的数据,此数据与连接的外部数据是一致的。语法如下:
tips:在运行时,只会加载一次外部数据到flinksql普通动态表。后续外部数据表有更新时,flinksql的普通动态表不会跟着自动更新。
我们可以把数据写入到flinksql动态表,从而实现写入数据到外部系统的目的。语法如下:
⑶ flink sql 知其所以然(十三):流 join问题解决
本节是 flink sql 流 join 系列的下篇,上篇的链接如下:
废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:
书接上文,上文介绍了曝光流在关联点击流时,使用 flink sql regular join 存在的 retract 问题。
本文介绍怎么使用 flink sql interval join 解决这些问题。
flink sql 知其所以然(十二):流 join 很难嘛???(上)
看看上节的实际案例,来看看在具体输入值的场景下,输出值应该长啥样。
场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。
来一波输入数据:
曝光数据:
点击数据:
预期输出数据如下:
上节的 flink sql regular join 解决方案如下:
上节说道,flink sql left join 在流数据到达时,如果左表流(show_log)join 不到右表流(click_log) ,则不会等待右流直接输出(show_log,null),在后续右表流数据代打时,会将(show_log,null)撤回,发送(show_log,click_log)。这就是为什么产生了 retract 流,从而导致重复写入 kafka。
对此,我们也是提出了对应的解决思路,既然 left join 中左流不会等待右流,那么能不能让左流强行等待右流一段时间,实在等不到在数据关联不到的数据即可。
当当当!!!
本文的 flink sql interval join 登场,它就能等。
大家先通过下面这句话和图简单了解一下 interval join 的作用(熟悉 DataStream 的小伙伴萌可能已经使用过了),后续会详细介绍原理。
interval join 就是用一个流的数据去关联另一个流的一段时间区间内的数据。关联到就下发关联到的数据,关联不到且在超时后就根据是否是 outer join(left join,right join,full join)下发没关联到的数据。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>
来看看上述案例的 flink sql interval join sql 怎么写:
这里设置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE 代表 show_log 表中的数据会和 click_log 表中的 row_time 在前后 10 分钟之内的数据进行关联。
运行结果如下:
如上就是我们期望的正确结果了。
flink web ui 算子图如下:
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>
那么此时你可能有一个问题,结果中的前两条数据 join 到了输出我是理解的,那当 show_log join 不到 click_log 时为啥也输出了?原理是啥?
博主带你们来定位到具体的实现源码。先看一下 transformations。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>
可以看到事件时间下 interval join 的具体 operator 是 org.apache.flink.table.runtime.operators.join. 。
其核心逻辑就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 来处理具体 join 逻辑。 RowTimeIntervalJoin 重要方法如下图所示。
TimeIntervalJoin
下面详细给大家解释一下。
join 时,左流和右流会在 interval 时间之内相互等待,如果等到了则输出数据[+(show_log,click_log)],如果等不到,并且另一条流的时间已经推进到当前这条数据在也不可能 join 到另一条流的数据时,则直接输出[+(show_log,null)],[+(null,click_log)]。
举个例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE , 当 click_log 的时间推进到 2021-11-01 11:00:00 时,这时 show_log 来一条 2021-11-01 02:00:00 的数据, 那这条 show_log 必然不可能和 click_log 中的数据 join 到了,因为 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之间的数据以及过期删除了。则 show_log 直接输出 [+(show_log,null)]
以上面案例的 show_log(左表) interval join click_log(右表) 为例(不管是 inner interval join,left interval join,right interval join 还是 full interval join,都会按照下面的流程执行):
上面只是左流 show_log 数据到达时的执行流程(即 ProcessElement1 ),当右流 click_log 到达时也是完全类似的执行流程(即 ProcessElement2 )。
小伙伴萌在使用 interval join 需要注意的两点事项:
本文主要介绍了 flink sql interval 是怎么避免出现 flink regular join 存在的 retract 问题的,并通过解析其实现说明了运行原理,博主期望你读完本文之后能了解到:
⑷ flink1.12.1扩展flink-sql 支持写入到sqlserver
目前业务上有同步数据到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,
科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波
主要实现getUpsertStatement的方法,本来以为能直接一波flinkStreamSql 的实现,结果发现
报错 SQL statement must not contain ? character.
查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改
完整的SqlServerDialect文件
最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了
注意url写法为:jdbc:jtds:sqlserver://xxx:1433;databaseName=master;
[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java