Ⅰ Flink sql Query 语法(一)
SELECT 语句和 VALUES 语句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定,会以 Table 的形式返回 SELECT (或 VALUE)吵桐迅的查询结果。Table 可被用于 SQL 或 Table API 查询、转换为 DataSet 或 DataStream、输出到 TableSink。SQL 与 Table API 的查询可以进行无缝融合、整体优化。
为了可以在 SQL 查询中访问到表,需要先在 TableEnvironment 中 注册表 (可以通过 TableSource、Table、CREATE TABLE 语句、DataStream 或 DataSet 注册)。为方便起见 Table.toString() 将会在其 TableEnvironment 中以唯一的名称自动注册表,并返回名称。
注意: 查询若包括了不支持的 SQL 特性,将会抛出 TableException 。
以下示例显示如何在已注册和内联表上指定 SQL 查询。
SELECT 语句或者 VALUES 语句可以通过 TableEnvironment.executeSql() 方法来执行,该方法返回 TableResult 对象用于包装查询的结果,一个 Table 对象可以通过 Table.execute() 方法执行获取查询结果。 TableResult.collect() 方法返回一个可以关闭升此的行迭代器(除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以通过 CloseableIterator#close() 方法主动地关闭作业以防止资源泄露)。 还可以通过 TableResult.print() 方法将查询结果打印到控制台。TableResult 中的结果数据只能被访问一次,因此一个 TableResult 实例中, collect() 方法和 print() 方法不能被同时使用。
TableResult.collect() 与 TableResult.print() 的行为在不同的 checkpointing 模式下略有不同。
Flink 通过支持标准 ANSI SQL的 Apache Calcite 解析 SQL。以下“BNF-语法”描述了批处理和流处理查询中所支持的 SQL 特性的超集。
Flink SQL 对于标识符(表、属性、函数名)的命名策略类似于 Java 的词法约定:
字符串文本常量需要被单引号包起来(如 SELECT 'Hello World' )。两个单引号表示转义(如 SELECT 'It''s me.' )。字符串文本常量支持 Unicode 字符,如需明确使用 Unicode 编码,请使用以下语法:
WITH 提供了编写辅助语句的方法,以便在更大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression,CTE),可以认为它定义了只存在于一个查询中的临时视图。
WITH 语法:
下面的示例定义了一个 CTE: orders_with_total ,并在 GROUP BY 查询中使用它。
SELECT 语句的一般语法为:
table_expression 可以是任何数据源(表、视图、VALUES 子句、多个表的 Join 结果、子查询)。下面的事例读取 Orders 表的所有列:
select_list 指定 * 表示解析所有的列,但是不建议在生产环境中使用,会降低性能,建议轮迹只查询需要的列:
查询可以使用 VALUES 子句,每个元组(Tuple)对应一个 Row,并且可以设置别名:
WHERE 语句可以过滤 Row:
可以对每行数据的指定列调用函数(内置、自定义函数,自定义函数必须提前注册):
如果指定 SELECT DISTINCT,则将从结果集中删除重复行(每组重复中保留一行)。
对于流式查询,计算查询结果所需的状态(State)可能会无限增长。状态大小取决于不同行的数量。可<u>以为查询配置适当的状态生存时间(TTL),以防止状态大小过大。这可能会影响查询结果的正确性</u>。
Window 是流处理的核心。Windows 将流拆分为有限大小的片段应用计算。只有流处理支持。
Flink 1.13 提供了几个 Table-valued functions(TVF,区别于 Group Window Function),将表中的元素划分为 windows,包括:
- 滚动窗口(Tumbling windows)
- 滑动窗口(Hop, Sliding windows)
- 累加窗口(Cumulate windows)
- 会话窗口(Session windows,TVF 暂不支持)
每个元素在逻辑上可以属于多个窗口,具体取决于所使用的窗口函数。TVF 必须和聚合操作一起使用:
假设存在一个 Bid 表
指定一个固定大小的窗口,并且不重叠,语法:
设定一个10分钟大小的滚动窗口,
指定一个固定大小的窗口,设定滑动间隔,元素会被指定给多个窗口,语法:
设定一个10分钟大小,每5分钟滑动的窗口,
指定一个窗口的最大规模,按照指定时间间隔增长累加,直到达到窗口的最大规模,每次窗口增长会进行一次计算,可以理解为多次计算的滚动窗口,语法:
设定一个10分钟大小,每2分钟累计一次的窗口,
Ⅱ Flink SQL 知其所以然(五)| 自定义 protobuf format
protobuf 作为目前各大公司中最广泛使用的高效的协议数据交换格式工具库,会大量作为流式数据传输的序列化方式,所以在 flink sql 中如果能实现 protobuf 的 format 会非常有用( 目前社区已经有对应的实现,不过目前还没有 merge,预计在 1.14 系列版本中能 release )。
issue 见: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC
pr 见: https://github.com/apache/flink/pull/14376
这一节主要介绍 flink sql 中怎么自定义实现 format ,其中以最常使用的 protobuf 作为案例来介绍。
如果想在本地直接测试下:
关于为什么选择 protobuf 可以看这篇文章,写的很详细:
http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral
在实时计算的领域中,为了可读性会选择 json ,为了效率以及一些已经依赖了 grpc 的公司会选择 protobuf 来做数据序列化,那么自然而然,日志的序列化方式也会选择 protobuf 。
而官方目前已经 release 的版本中是没有提供 flink sql api 的 protobuf format 的。如下图,基于 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
因此本文在介绍怎样自定义一个 format 的同时,实现一个 protobuf format 来给大家使用。
预期效果是先实现几种最基本的数据类型,包括 protobuf 中的 message (自定义 model)、 map (映射)、 repeated (列表)、其他基本数据类型等,这些都是我们最常使用的类型。
预期 protobuf message 定义如下:
测试数据源数据如下,博主把 protobuf 的数据转换为 json,以方便展示,如下图:
预期 flink sql:
数据源表 DDL:
数据汇表 DDL:
Transform 执行逻辑:
下面是我在本地跑的结果:
可以看到打印的结果,数据是正确的被反序列化读入,并且最终输出到 console。
目前业界可以参考的实现如下: https://github.com/maosuhan/flink-pb , 也就是这位哥们负责目前 flink protobuf 的 format。
这种实现的具体使用方式如下:
其实现有几个特点:
[图片上传失败...(image-66c35b-1644940704671)]
其实上节已经详细描述了 flink sql 对于 sourcesinkformat 的加载机制。
如图 serde format 是通过 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 创建的
所有通过 SPI 的 sourcesinkformt 插件都继承自 Factory 。
整体创建 format 方法的调用链如下图。
最终实现如下,涉及到了几个实现类:
具体流程:
上述实现类的具体关系如下:
介绍完流程,进入具体实现方案细节:
ProtobufFormatFactory 主要创建 format 的逻辑:
resourcesMETA-INF 文件:
主要实现反序列化的逻辑:
可以注意到上述反序列化的主要逻辑就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定义的。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 其实就是一个 convertor 接口:
其作用就是将 protobuf message 中的每一个字段转换成为 RowData 中的每一个字段。
ProtobufToRowDataConverters 中就定义了具体转换逻辑,如截图所示,每一个 LogicalType 都定义了 protobuf message 字段转换为 flink 数据类型的逻辑:
源码后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取。
本文主要是针对 flink sql protobuf format 进行了原理解释以及对应的实现。
如果你正好需要这么一个 format,直接后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取源码吧。
当然上述只是 protobuf format 一个基础的实现,用于生产环境还有很多方面可以去扩展的。
Ⅲ flink1.12 sql向redis实时写数据
基于 bahir-flink 二次开发,使它支持SQL直接定义写入redis,用户通过DDL指定自己需要保存的字段。
命令行执行 mvn package -DskipTests=true打包后,将生成的包flink-connector-redis_2.12-1.11.1.jar引入flink lib中即可,无需其它设置。
相对上一个版本简化了参数设置,思路更清晰,上一版本字段的值会根据主键等条件来自动生成,这要求使用者需要了解相关规则,有一定的学习成本并且容易埋坑,重构后字段的值由用户在DDL中显示地指定,如下:
取消了必须有主键的限制,使用更简单,如果有多个字段组合成key或者value,需要用户在DML中使用concat_ws等方式组装,不再是插件在后台用不可见字符拼装。
Ⅳ flink开窗函数
FlinkSQL
窗口:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
SESSION(<time-attr>, <gap-interval>)
<gap-interval>: INTERVAL 'string' timeUnit
累积窗口函数:饥漏CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
窗口分组聚合GROUPING SETS
over函数
CUBE
Flink DataStream
增量聚合和全量聚合
增量聚合: 窗此肢行口不维护原始数据,森哗只维护中间结果,每次基于中间结果和增量数据进行聚合。
如: ReceFunction、AggregateFunction
全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。如:ProcessWindowFunction
Ⅳ flink1.12.1扩展flink-sql 支持写入到sqlserver
目前业务上有同步数据到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,
科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波
主要实现getUpsertStatement的方法,本来以为能直接一波flinkStreamSql 的实现,结果发现
报错 SQL statement must not contain ? character.
查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改
完整的SqlServerDialect文件
最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了
注意url写法为:jdbc:jtds:sqlserver://xxx:1433;databaseName=master;
[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java
Ⅵ flink sql 知其所以然(十三):流 join问题解决
本节是 flink sql 流 join 系列的下篇,上篇的链接如下:
废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:
书接上文,上文介绍了曝光流在关联点击流时,使用 flink sql regular join 存在的 retract 问题。
本文介绍怎么使用 flink sql interval join 解决这些问题。
flink sql 知其所以然(十二):流 join 很难嘛???(上)
看看上节的实际案例,来看看在具体输入值的场景下,输出值应该长啥样。
场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。
来一波输入数据:
曝光数据:
点击数据:
预期输出数据如下:
上节的 flink sql regular join 解决方案如下:
上节说道,flink sql left join 在流数据到达时,如果左表流(show_log)join 不到右表流(click_log) ,则不会等待右流直接输出(show_log,null),在后续右表流数据代打时,会将(show_log,null)撤回,发送(show_log,click_log)。这就是为什么产生了 retract 流,从而导致重复写入 kafka。
对此,我们也是提出了对应的解决思路,既然 left join 中左流不会等待右流,那么能不能让左流强行等待右流一段时间,实在等不到在数据关联不到的数据即可。
当当当!!!
本文的 flink sql interval join 登场,它就能等。
大家先通过下面这句话和图简单了解一下 interval join 的作用(熟悉 DataStream 的小伙伴萌可能已经使用过了),后续会详细介绍原理。
interval join 就是用一个流的数据去关联另一个流的一段时间区间内的数据。关联到就下发关联到的数据,关联不到且在超时后就根据是否是 outer join(left join,right join,full join)下发没关联到的数据。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>
来看看上述案例的 flink sql interval join sql 怎么写:
这里设置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE 代表 show_log 表中的数据会和 click_log 表中的 row_time 在前后 10 分钟之内的数据进行关联。
运行结果如下:
如上就是我们期望的正确结果了。
flink web ui 算子图如下:
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>
那么此时你可能有一个问题,结果中的前两条数据 join 到了输出我是理解的,那当 show_log join 不到 click_log 时为啥也输出了?原理是啥?
博主带你们来定位到具体的实现源码。先看一下 transformations。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>
可以看到事件时间下 interval join 的具体 operator 是 org.apache.flink.table.runtime.operators.join. 。
其核心逻辑就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 来处理具体 join 逻辑。 RowTimeIntervalJoin 重要方法如下图所示。
TimeIntervalJoin
下面详细给大家解释一下。
join 时,左流和右流会在 interval 时间之内相互等待,如果等到了则输出数据[+(show_log,click_log)],如果等不到,并且另一条流的时间已经推进到当前这条数据在也不可能 join 到另一条流的数据时,则直接输出[+(show_log,null)],[+(null,click_log)]。
举个例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE , 当 click_log 的时间推进到 2021-11-01 11:00:00 时,这时 show_log 来一条 2021-11-01 02:00:00 的数据, 那这条 show_log 必然不可能和 click_log 中的数据 join 到了,因为 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之间的数据以及过期删除了。则 show_log 直接输出 [+(show_log,null)]
以上面案例的 show_log(左表) interval join click_log(右表) 为例(不管是 inner interval join,left interval join,right interval join 还是 full interval join,都会按照下面的流程执行):
上面只是左流 show_log 数据到达时的执行流程(即 ProcessElement1 ),当右流 click_log 到达时也是完全类似的执行流程(即 ProcessElement2 )。
小伙伴萌在使用 interval join 需要注意的两点事项:
本文主要介绍了 flink sql interval 是怎么避免出现 flink regular join 存在的 retract 问题的,并通过解析其实现说明了运行原理,博主期望你读完本文之后能了解到:
Ⅶ Flink SQL实战演练之自定义Clickhouse Connector
简介:实时数仓目前的架构是flink+clickhouse,社区目前jdbc connector不支持clickhouse的方言,所以决定自定义clickhouse connector实现flink sql写入数据到clickhouse。
目前想要实现flink sql数据落地到ck,可以修改jdbc connector的源码,增加ck方言,或者采用阿里提供的ck connector包,为了更好的理解flink connector的原理,这里自定义connector实现。
目前支持Flink写入Clickhouse的依赖哭比较多,如果数据格式固定,可以CSV的方式写入,如果不固定,可采用Json的方式写入。
Ⅷ FlinkX快速开始(一)
链接地址:胡稿纤 FlinkX
FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等
1、前置
需要安装maven、java8、配置好github相关参数
2、Fork FlinX项目到自己的仓库中
2、Clone项目到本地
git clone https://github.com/liukunyuan/flinkx.git
3、安装额外的jar包
1)、cd flinkx/bin
2)、执行sh ./install_jars.sh(windows执行install_jars.bat脚本)
4、打包
1)、回到flinkx目录:cd ..
2)、执行裤仿打包命令敬肢:mvn clean package -Dmaven.test.skip=true
1、配置flink conf文件(暂时不需要安装flink)
1)、进入flinkconf目录
cd flinkconf
2)、修改flink-conf.yaml文件添加一行
rest.bind-port: 8888
2、配置mysqltomysql的json文件,路径:/Users/jack/Documents/jack-project/flinkx/flinkconf/mysql2mysql.json
3、运行任务
4、查看监控网页和log.txt文件: http://localhost:8888/
Ⅸ Flink1.13 SQL执行 oom 排查
flink on yarn cluster的模式, yarn上的应用经常发生异常, 如jobmanager的oom, zk心跳丢失, slot分配请求超时, hdfs文件已存在等等; 经过排查定位到了是flink sql的解析问题, 像count, where这类的春高圆语句在实际执行的时候变成了全量的查询
分析mp文件, 得知扒塌内存中存放了该表几乎全量的数据, 但sql加上where条件后, 实际上数据只有10来条, 是create table阶段的问题, 还念春是sql执行阶段的问题呢?