❶ Flink sql实战演练之自定义Clickhouse Connector
简介:实时数仓目前的架构是flink+clickhouse,社区目前jdbc connector不支持clickhouse的方言,所以决定自定义clickhouse connector实现flink sql写入数据到clickhouse。
目前想要实现flink sql数据落地到ck,可以修改jdbc connector的源码,增加ck方言,或者采用阿里提供的ck connector包,为了更好的理解flink connector的原理,这里自定义connector实现。
目前支持Flink写入Clickhouse的依赖哭比较多,如果数据格式固定,可以CSV的方式写入,如果不固定,可采用Json的方式写入。
❷ Flink SQL 知其所以然(五)| 自定义 protobuf format
protobuf 作为目前各大公司中最广泛使用的高效的协议数据交换格式工具库,会大量作为流式数据传输的序列化方式,所以在 flink sql 中如果能实现 protobuf 的 format 会非常有用( 目前社区已经有对应的实现,不过目前还没有 merge,预计在 1.14 系列版本中能 release )。
issue 见: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC
pr 见: https://github.com/apache/flink/pull/14376
这一节主要介绍 flink sql 中怎么自定义实现 format ,其中以最常使用的 protobuf 作为案例来介绍。
如果想在本地直接测试下:
关于为什么选择 protobuf 可以看这篇文章,写的很详细:
http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral
在实时计算的领域中,为了可读性会选择 json ,为了效率以及一些已经依赖了 grpc 的公司会选择 protobuf 来做数据序列化,那么自然而然,日志的序列化方式也会选择 protobuf 。
而官方目前已经 release 的版本中是没有提供 flink sql api 的 protobuf format 的。如下图,基于 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
因此本文在介绍怎样自定义一个 format 的同时,实现一个 protobuf format 来给大家使用。
预期效果是先实现几种最基本的数据类型,包括 protobuf 中的 message (自定义 model)、 map (映射)、 repeated (列表)、其他基本数据类型等,这些都是我们最常使用的类型。
预期 protobuf message 定义如下:
测试数据源数据如下,博主把 protobuf 的数据转换为 json,以方便展示,如下图:
预期 flink sql:
数据源表 DDL:
数据汇表 DDL:
Transform 执行逻辑:
下面是我在本地跑的结果:
可以看到打印的结果,数据是正确的被反序列化读入,并且最终输出到 console。
目前业界可以参考的实现如下: https://github.com/maosuhan/flink-pb , 也就是这位哥们负责目前 flink protobuf 的 format。
这种实现的具体使用方式如下:
其实现有几个特点:
[图片上传失败...(image-66c35b-1644940704671)]
其实上节已经详细描述了 flink sql 对于 sourcesinkformat 的加载机制。
如图 serde format 是通过 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 创建的
所有通过 SPI 的 sourcesinkformt 插件都继承自 Factory 。
整体创建 format 方法的调用链如下图。
最终实现如下,涉及到了几个实现类:
具体流程:
上述实现类的具体关系如下:
介绍完流程,进入具体实现方案细节:
ProtobufFormatFactory 主要创建 format 的逻辑:
resourcesMETA-INF 文件:
主要实现反序列化的逻辑:
可以注意到上述反序列化的主要逻辑就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定义的。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 其实就是一个 convertor 接口:
其作用就是将 protobuf message 中的每一个字段转换成为 RowData 中的每一个字段。
ProtobufToRowDataConverters 中就定义了具体转换逻辑,如截图所示,每一个 LogicalType 都定义了 protobuf message 字段转换为 flink 数据类型的逻辑:
源码后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取。
本文主要是针对 flink sql protobuf format 进行了原理解释以及对应的实现。
如果你正好需要这么一个 format,直接后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取源码吧。
当然上述只是 protobuf format 一个基础的实现,用于生产环境还有很多方面可以去扩展的。
❸ Kylin 在腾讯的平台化及 Flink 引擎实践
首先,介绍下我们为什么进行平台化改造?
我们部门为公司内其他业务线提供了各种大数据平台,如 Kylin、HBase、Spark、Flink 等等,提供公共统一的平台系统势必会牵扯到用户管理、资源隔离、部门内各个平台的融合等问题,而 Kylin 现有的用户管理、资源隔离机制并不能满足我们需求,基于此,我们对 Kylin 进行了平台化改造。平台化改造完成后,我希望在以下几个方面,能够有一些改进:
1. 用户管理
为了便于系统的管理及安全,公司内部有一套自己的认证系统,而且需要用个人账号去验证,所以 Kylin 作为一个平台对外提供服务的话,也需要接入到该系统。所以,我们新增了一个用户管理界面,该界面展示了 Kylin 平台内的所有用户。管理员可以新增任一用户到 Kylin 平台,新增用户时会填写企业微信名、用户角色以及是否激活用户。当用户登录系统时,会自动检测用户账号以及该账号是否在平台内注册,如果没有注册则无权限,反之自动登录系统。
2. 内部 Hive 兼容
由于 历史 原因,我们部门内的 Hive 版本(THive)与 Kylin 不兼容,这就导致 Kylin 无法正常访问 Kylin 集群,所以我们采用了上图所示的兼容方案。首先,我们使用社区 Hive 版本搭建一个全新的 Hive,并作为 Kylin 的默认 Hive;其次,当 kylin 加载源表时,我们是通过内部的 UPS 系统读取 THive 的元数据信息;最后,在 Load 源表到 Kylin 时,我们根据表的元数据信息在 Kylin 的 Hive 上创建一张相同的表,但该表的存储路径依旧指向 THive 的路径,而用户在构建 cube 时,则访问新创建的表,至此就解决了 Kylin 访问 THive 的问题。
3. 计算资源可配置化
目前,Kylin 配置计算资源信息有两种方式:一是在 Kylin 配置文件中配置一个全局的计算集群及队列;二是在创建工程或者 Cube 时,在扩展参数中指定集群配置。这两种配置方式在灵活性及便捷性方面都比较差,而在我们内部是有接口可以获取到某一个用户有计算资源的计算集群及计算队列的,所以,在创建工程或者 Cube 时,我们使用了下拉框选择式的方式,让用户选择提交任务的计算资源及队列,从而大大简化了用户的使用流程。
4. 通知机制
Kylin 只提供了发邮件通知的功能,而作为目前使用最广泛的工具,微信、企业微信在实时性及便捷性方面都远远胜于邮件,所以,我们提供了邮件、微信、企业微信三种方式,供用户选择。
5. 定时调度
Kylin 系统自身并没有提供定时调度功能,但基本上每家公司都有自己的统一调度平台,我们也不例外。我们通过 Kylin 提供的API接口,将 Cube 定时构建的功能作为一个插件集成到了公司内部的统一调度平台上。
6. 业务接入
做完以上平台化改造后,Kylin 平台基本具备了接入不同类型业务的能力,用户申请接入流程如上图所示。
业务使用情况:
我们团队是在今年初才开始引入 Kylin,目前已经在使用的业务主要有 QQ 音乐、腾讯视频、广点通、财付通等,Cube 的数量有 10 个,单份数据存储总量是 5 T,数据规模在 30 亿条左右。
Flink Cube Engine 原理及实践
目前,Kylin 已经支持使用 MapRece 和 Spark 作为构建引擎,而作为目前比较火的流批一体的大数据计算引擎怎能缺席?所以我使用 Flink 开发了一个高性能的构建引擎:Flink Cube Engine。
Flink Cube Engine 是腾讯基于 Kylin 插件化的 Cube Engine 架构开发的一个高性能构建引擎,目前已具备了上线使用的能力,感兴趣的同学可以体验一下,目前该引擎已经在腾讯生产环境上线 1 个月+,非常稳定而且效果不错。
Umbrella issue:
https://issues.apache.org/jira/browse/KYLIN-3758
分支:
https://github.com/apache/kylin/tree/engine-flink
1. 支持 Flink Engine 的子任务
Kylin 的一次 Cube 构建任务,包含了很多个子任务,而最重要的莫过于 Cube 构建这一步骤,所以,我们在 build 和 merge Cube 这两种任务中,优先实现了Cube 构建这一步骤,其他计算步骤依旧通过使用 MapRece 来实现。
2. 如何使用 Flink Cube Engine
选择使用 Flink Cube Engine 的方式也和选择 Map Rece 和 Spark 任务类似,我们提供了前台可视化的界面,供用户选择。
3. Flink Cube Engine 与 Spark (线上业务)
上图是我们内部业务上线 Flink Cube Engine 之后的性能对比,从图中可见,该步骤的构建耗时从 49 分钟降到了 13 分钟,优化效果比较明显。两种情况的资源配置如下:
Flink 配置为:
-ytm 4G -yjm 2G -ys 1 -p 100 -yn 100
Spark 采用的动态分配资源如下:
kylin.engine.spark-conf.spark.dynamicAllocation.enabled=true
kylin.engine.spark-conf.spark.dynamicAllocation.minExecutors=2
kylin.engine.spark-conf.spark.dynamicAllocation.maxExecutors=1000
kylin.engine.spark-conf.spark.dynamicAllocation.executorIdleTimeout=300
kylin.engine.spark-conf.spark.shuffle.service.enabled=true
kylin.engine.spark-conf.spark.shuffle.service.port=7337
虽然,Spark 采用的是动态分配资源,但在任务执行过程中,我们观察到 Spark实际分配的资源远比 Flink 要多的多。
那为什么性能提升会那么明显呢?
4. Flink Cube Engine 的优化
性能的提升,无非有两方面的原因,一是参数的优化,二是代码的优化。
1) 调参
影响 Flink 任务性能主要有几个核心参数:并行度、单个 TM slot 数目、TM container 数目,其中单个 TM container 数目=并行度/单个 TM slot 数目。
我们调优的过程采用了控制变量法,即:固定并行度不变、固定 Job 总内存数不变。通过不断的调整单个 TM 的 slot 数目,我们发现 如果单个 TM 的 slot 数目减少,拉起更多的 TM container 性能会更好。
此外,我们还使用了对象复用、内存预分配等方法,发现没有对性能提升起到太大的效果。
2) 代码优化(合并计算)
在实现 Flink Cube Engine 的时候,一开始我们使用了 Map/Rece 两个算子,发现性能很差,比 Spark 的性能还要差很多,后来我们通过调整使用了 Flink 的 mapPartition/receGroup 两个算子,性能就有了明显的提升。
Flink Cube Engine 下一步的计划:
1. 全链路 Flink
如上所述,目前 Cube 构建过程中,只有最关键的 cube 构建这一子任务使用了 Flink,而其他子任务仍然使用的是 MapRece,我们下一步会继续完善 Flink Cube Engine,将所有的子任务都使用 Flink 来构建。
2. Flink 升级到 1.9
Flink 最近发布了 1.9.0,该版本包含了很多重要特性且性能也有了一定提升,所以,我们会把 Flink Cube Engine 使用的 Flink 版本升级到1.9.0。
❹ Flink SQL实战演练之自定义Table Format
简介:接着上次Flink CDC继续聊,提到这块,不得不重点说下canal-json format了,canal json format对json format进行了封装,负责把binlog形式的json数据转化成了Flink能够识别的RowData数据,当然也包括数据的描述信息封装类RowType。笔者想实现根据RowKind进行数据的过滤,目前可以通过修改canal format的源数据来实现,也可以通过将changelog流以changelog json的形式回写Kafka。
基于目前对table format的了解,这里自定义event json format,用来处理事件流数据,因为事件流字段不固定,可能只有少部分字段是固定的,其他字段都是扩展的,所以笔者想实现用户自定义schema指定公共字段,然后其他字段以json的行为存在metadata中的default字段中。
Table Format作为Connector组件单独用于序列化和反序列化内部数据的模块而单独存在,多个Connector可以公用。自定义Table Format可以让大家更好的理解Flink SQL时如何将外部数据转化为内部可以识别的RowData数据结构的,从而在排查问题的时候能准确定位到具体位置。
❺ Flink SQL 写入 Hive表的性能问题
写入Hive表的性能,每秒写入记录数,发现性能并不乐观,上有节点背压严重。
Hive Table DDL:
而写入HDFS文件的性能,每秒写入记录数,性能符合期待。
HDFS文件的DDL:
翻阅Flink的PR,十几天前,阿里Flink的开发同学已经注意到了这个问题,我们将之吸收到测试环境,编译替换lib下jar包,重新测试,性能确实up了,单并发升至5W每秒,上游节点才稍微有背压。
[FLINK-19121][hive] Avoid accessing HDFS frequently in HiveBulkWriterFactory
所以,Flink的新特性从发布到应用线上,稳定性与性能上都不能过于乐观、听信于官方宣传,
司内另一教训就是过早在热数据存储层启用了Hadoop的纠删码,导致问题不断,被迫退化到副本机制。
这与前期调研、验证不足,对该特性过于轻信有莫大关系,教训也是深刻。
底层采用Reed-Solomon(k,m)算法,RS是一种常用的纠删码算法,通过矩阵运算,可以为k位数据生成m位校验位,根据k和m的取值不同,可以实现不同程度的容错能力,是一种比较灵活的纠删码算法。
HDFS纠删码技术能够降低数据存储的冗余度,以RS(3,2)为例,其数据冗余度为67%,相比Hadoop默认的200%大为减少。但是纠删码技术存储数据和数据恢复都需要 消耗cpu进行计算,实际上是一种以时间换空间的选择,因此比较适用的场景是对冷数据的存储 。冷数据存储的数据往往一次写入之后长时间没有访问,这种情况下可以通过纠删码技术减少副本数。
❻ flink1.12.1扩展flink-sql 支持写入到sqlserver
目前业务上有同步数据到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,
科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波
主要实现getUpsertStatement的方法,本来以为能直接一波flinkStreamSql 的实现,结果发现
报错 SQL statement must not contain ? character.
查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改
完整的SqlServerDialect文件
最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了
注意url写法为:jdbc:jtds:sqlserver://xxx:1433;databaseName=master;
[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java
❼ 流批一体不只有Flink,还有实时数据模型
通常来讲,数据仓库的建设,都是以离线作为主要的密报,下游的应用,不论是报表还是接口,所提供的数据也大多是T-1时效性。
但伴随着业务的变化,当离线做到没什么可以继续做的时候,实时就会被拿出来,作为新一个阶段的目标进行攻克。
在流批一体建设之前,这种实时诉求通常会开发成分钟级的任务,通过近实时的方案来解决业务的问题,但分钟级会带来诸如任务过多、资源挤占较大、无法支持复杂逻辑等问题。
因此专门支持实时计算的框架,比如早期的Storm,能够尝试从纯实时的角度解决业务问题,就被拿出来作为尝试。然而Storm的局限性也很大,因为那会的任务开发只能通过Java的方式来进行,与Hive所推崇的纯SQL方案相比,上手难度大了不少,同时两套代码的逻辑几乎没有可比性,这种方案也就一直没有什么声音。
尽管实时技术有各种缺陷,但作为一种能够很容易讲清楚价值的项目,同时又非常便于向上汇报的技术方案,实时技术还是被或多或少的做了起来。在大多数的公司里,实时和离线就会有不同的团队进行维护,或者是同一个团队,但分成不同的项目来执行。这个阶段,优先高效的把业务做起来,哪怕场景再简单,但能够证明实时有价值和前景,这个阶段的目标就算完成了。
以上的各种方案,难免会带来三个特别难以解决的问题:
(1)数据的口径上,实时和离线很容易不统一;
(2)数据模型的规范上,实时和离线也往往是分开建设;
(3)即便是同一种口径和同一种规范,实时和离线也要分成两套代码来维护。
这三个问题短时间内会被高速发展掩盖掉,但当业务对实时的诉求越来越多、压力越来越大的时候,口径和代码的不统一,就会越来越成为阻碍敏捷开发的障碍,需要有方案进行解决。
后来Flink出现了,带来了流批一体的全新方案,这个问题便出现了解决的曙光,这也比较接近我们对于实时计算的理想方案,因为其意义堪比Hive,也成为了各个大厂面试的标配问题。
然而,仅仅学会Flink是不够的,因为流批一体带来的并不仅仅是技术方案或者是框架的改变,同样带来了数据模型的改变,这就要求我们从数据模型上,而不是技术方案上,来制定我们的实时方案。
那么我们如何理解“实时数据模型”这件事情呢?
通常而言,我们关心的内容,包括如下几个方面:
(1)实时数据源与离线数据源存在差异,导致相同的字段,取值或者类型会存在不相等的情况;
(2)实时和离线由于底层执行机制的不同,通常需要维护两套代码,会带来诸如口径不统一、质量检测难的问题;
(3)产品逻辑变化较快时,离线模型修改相对容易,但实时模型需要考虑压测、削峰、重启等技术问题,维护成本非常高昂。
数据仓库之所以能够普及并被业务接受,正是因为其模型能够屏蔽掉底层差异的问题,并且有相对可靠的数据质量监控方法,并且变更成本非常低。而实时数仓如果想要替代掉离线数仓,以上的问题通常是需要一些模型设计甚至是平台工具的来解决,这些问题解决的重要性,并不比Flink弱。
我们先从比较可控的模型层面说起。
在离线的概念里,数仓模型设计成了DWD/DWS/ADS三个层级,原本的概念是DWD面向事实表的构建,DWS面向公共指标的统一,ADS负责灵活的口径变化问题。
在离线的概念里,DWD/DWS/ADS三个层级需要保留,但负责的目标会有一些变化,同时还需要增加存储统一层,也就是以TiDB/Holo为代表的数据库,来承担服务分析一体化的诉求。
让我们先看DWD层,DWD承担了屏蔽实时离线链路差异的问题,最重要的作用是保证表结构的统一及字段内容的对齐。DWD最重要的意义,是保证离线表和实时表,其表结构和字段概念是相同的。
为什么这么强调?试想一下,在离线场景下,我们可以在DWD上灵活的增加各种统计标签,或者是将维度退化到事实表,都是一些left join或者是服务端直接打标可以解决的事情。但在实时场景下,这会变成多流join或者是缓存等更复杂的技术场景,导致这些信息并不能有效的记录到DWD,因此DWD的设计就要产生一些变化,有一些内容在实时场景下无法准确记录,这一类信息需要标识到对应的字段描述上,下游使用时才不会出错。
同时,实时和离线存储数据的介质,也必然有一些区别。例如离线可以存在HDFS上,实时则可能视情况保存在数据库、HDFS甚至是内存中,这时候对于字段格式、读取方式都会有差异,设计表时其约束条件也会更多。
因而,DWD更多承担了逻辑统一的职责,依旧以事实表为基础,但约束条款要比离线更多。
再看一下DWS层,离线上DWS是负责口径统一的重要一环,将通用的维度和口径计算方法抽象出现,以提供跨数据域的灵活使用。但在实时场景下,这一类的维护收益通常都比较低,不仅因为实时只看当天的数据,也是因为实时本身的维度难度就较大,多一层模型其收益会急速下降,因而大多数时候会忽略掉DWS的建设,ADS直接引用DWD进行统计。
然而,DWS毕竟存储的内容要比DWD少很多,因此如果计算资源瓶颈非常明显,或者是业务场景不需要分析实时明细数据的情况下,或者是DWD的下游引用过多时,DWS可以承担削峰的重任,通过减少数据量以应对大促等场景,还是有一定意义的。
接下来就是最重要的ADS层,在这一层上,逻辑统一、口径统一、大促削峰在前置模型上都得到了一定程度的解决,ADS则像离线一样承担了应对需求变化的重任。
但ADS所面临的情况和离线还是有所不同的,因为ADS的任务启动,不仅要启动一个离线的跑批任务,还要同时启动一个实时的流式任务,而ADS往往会同时统计离线+实时的结果,以应对同比、环比等场景。
这时候很多具体Case要具体分析了,因为特定场景的坑会非常多。例如最常见的“同比”,要对比今年和去年的结果变化,离线往往会统计分小时的结果,但实时会累计起始时刻到当期时刻的结果,因而当一个小时没有结束的时候,这个同比的波动变化会非常大,给人一种“数据是错误的”印象,新手很容易踩这个坑,从而被业务质疑。
因此,针对累计统计指标,从代码设计上就要考虑到这种情况,都根据时间字段统计起始到当前时刻的结果的,在代码逻辑上会要求一些统计技巧。
很多时候,因为业务指标变化太快,改实时代码是来不及的,这时候一部分的工作量甚至需要报表工具的数据集来解决,改动查询sql,要比改动任务来的快捷多了。但这部分的能力,其实是依赖于存储工具的,个人认为可以分到存储统一层来解决。
最后是存储统一层,因为一些特殊的场景,比如实时分析明细数据,或者是不确定时间周期的多天统计结果,如果依赖Flink SQL来解决是有些不现实的,因而这部分的压力需要数据库来承担。
简单讲,就是将明细做轻度的汇总后,直接写到数据库,实时更新,下游自定义条件,并直接读库统计结果。这种场景既要求数据库有OLAP的计算能力,也要有OLTP的稳定特点,因而TiDB和Holo这一类HTAP的引擎就变得非常重要。
因为多了实时的部分,因此过去面向离线的开发工具,也需要有一些特定的改造,以适应实时的开发和运维诉求。
对于开发工具而言,其目标集中在四个场景上:元数据定义与获取、数据建模、开发与测试、运维与监控。
其次讲数据建模,因为建模的理论已经稳定了有些年头了,绝大多数场景下都是按照既定的方案来执行。过去离线当道时,规范执行的弱一些不是什么大问题,但流批一体当道的年代,规范是需要强约束的,这就对了开发工具提出了一定的要求,是否能够从平台层面上对规范进行内置,并以此来约束开发的同学,降低不规范模型对后期维护带来的压力。
这种建模能力的代表有两种,一种是规范表的命名,填写相应的分层+主题域+数据域+统计刷新方式,从源头上规范表的目标和作用;一种是规范指标的定义和使用,例如原子指标还是派生指标,统计周期多少,业务限定用语如何规范,统计粒度怎么填写。
在实际开发中,通过工具的限制,如果规范可以做的好,代码是可以自动生成出来的。当然,以上的功能,都属于通过牺牲开发效率,来提升数据质量的范畴,使用时需要根据团队的情况来限定。
再次是开发和测试,这是平台提供的最重要的能力。在开发层面,就是代码的预编译能力+发布功能。预编译不仅要检查代码的逻辑是否正确,同时对于代码中依赖的其他数据源,获取到的元数据信息是否准确,至少字段的命名不会有大的问题。当代码预编译通过,发布上线后,还需要检测当前是否有资源支持任务启动,并且上游的消息队列是否是启动的状态。
实时的测试一直都是比较大的问题,它不像离线可以启动一个SQL任务看看结果,实时在每个阶段的输入和输出,是需要通过平台支持的日志打印功能来进行辅助的。很多时候我们会新建一个测试专用的topic来测试结果,但对于流量较大的线上任务而言,这种方式无法像离线区分Dev环境一样,能够对资源进行隔离,因而如果能够支持圈定数据的输入和打印输出,对于测试的效率而言无疑是最佳的。
最后要提到的是运维与监控能力。运维能力是指根据输入的RPS,或者是cu使用情况,或者是任务的整体延迟,提供相应的参数调优能力,通过参数来调整任务的执行情况。并且能够根据以上指标的变化,自定义相应的阈值,提供相应的告警能力,通过短信或者是消息工具的方式触达任务维护者。
实时与离线有一些不同的是,离线可以通过增加一个监控节点的方式,通过group by判断数据是否重复,而实时任务则非常依赖Flink自身的一致性能力,因而发现和解决问题的成本更高。
其实做到运维这个环节,对人的要求其实是更高的。因为流批一体在运维上会带来一个好处,即实时任务和离线任务能够错峰执行,实时在白天压力大,而离线在晚上压力大。但同样的,这种方式对于维护者而言更加痛苦,因为不仅晚上要熬夜值班,白天同样不能休息,在大促期间甚至需要轮班来维护任务,可以说是“汇报一时爽,痛苦长相伴”。
从远处来看,流任务和批任务,在自身的机制上就存在非常大的差异,批程序面上的是特定时间内相对静态的数据,而流程序处理的则是change-log,虽然有可能数据在表结构层面,通过数据模型的设计来保持一致,但是在语义层面,其根本还是不一样的。这一点可能是最制约批流一体发展的问题,也是最难实现统一或者永远也不可能统一的。
综上,对于实时模型,开发工具需要将监控实时部分的能力进行补全,就像DWD层需要分别维护实时和离线两套架构一样,开发工具也需要分别维护两套架构的结果,因而现阶段的实时开发,还做不到降低维护和开发的成本,只能减轻其中部分环节的工作量。
以上讲了很长时间的实时模型,但从实际的效果上看,业务并不会感知到多么明显的技术变化,相反会有一种“面子工程”的感觉在里面。
当然,我并不否认实时的价值,在“搜广推”这三个技术占主导的领域内,作用还是很大的。但实时毕竟要比离线的内容,更加的难以理解,出现问题的排查成本也更高。这种复杂性使得我们在应对变化时,往往做不出有效的应对,就会变得特别被动。
因而,说一句事后的话,就是“实时的价值取决于业务方,而不是技术方”。只有业务对实时痛点强烈的场景下,我们做如此复杂的研究和应对,才能体现出自己的价值,更多的时候,是在“王婆卖瓜,自卖自夸”。有这种投入,还不如多招几个分析师更靠谱和实在。
本人之前的文章《天下数据,唯快不破》,重点强调了一个“快”字。但“天下熙熙皆为利来,天下攘攘皆为利往”,这个快更多的是在讲应对“变化”的快,而不是“技术”自己的快。
所以,为了以后的职业发展,我们要跟进实时技术的变化,但从自身的工作角度出发,如何应对业务的变化,才是自己要关心的课题。
❽ 基于Flink的实时计算平台的构建
一、系统架构
1. 接入层
Canal、Flume、Kafka
针对业务系统数据,Canal监控Binlog日志,发送至kafka;
针对日志数据,由Flume来进行统一收集,并发送至kafka。
消息队列的数据既是离线数仓的原始数据,也是实时计算的原始数据,这样可以保证实时和离线的原始数据是统一的。
2. 计算层
Flink
有了源数据,在 计算层 经过Flink实时计算引擎做一些加工处理,然后落地到存储层中不同存储介质当中。
3. 存储层
HBase、Kafka、ES、Mysql、Hive、Redis
不同的 存储介质 是通过不同的应用场景来选择。
4. 数据应用层
风控、模型、图谱、大屏展示
通过存储层应用于不同的 数据应用 ,数据应用可能是我们的正式产品或者直接的业务系统
二、技术实现
1. 计算引擎
实时计算引擎的功能要求
提供高级 API,支持常见的数据操作比如关联聚合,最好是能支持 SQL
具有状态管理和自动支持久化方案,减少对存储的依赖
可靠的容错机制,低延时,最好能够保证Exactly-once
Flink的优势
Flink的API、容错机制与状态管理都满足实时数仓计算引擎的需求
Flink高吞吐、低延时的特性
端到端的Exactly-once
WaterMark&Event Time的支持
Flink 不仅支持了大量常用的 SQL 语句,还有丰富的数据类型、内置函数以及灵活的自定义函数,基本覆盖了我们的开发场景
2. 存储引擎
根据不同的业务场景,使用最适合的存储引擎:
Kafka主要用于中间数据表的存储
ES主要针对日志数据的存储和分析
HBase、Redis可用于维表存储
Hive用于数据校验
Mysql可以用于指标计算结果的存储
三、数据分层
数据源:目前数据源主要是Binlog,通过Canal监控各个业务系统的Mysql,将binlog发送至kafka。
ODS层:主要将Binlog数据存储至Kafka,这一层不对数据进行任何操作,存储最原始的数据,Binlog 日志在这一层为库级别,即:一个库的变更数据存放在同一个 Kafka Topic 中。
DWD层:主要对数据进行简单的清洗。拆分主题,将库级别的主题拆分为表级别;打平数据,将data数组格式打平。
DWS层:主要根据不同的业务的需求,将该需求所涉及到的表进行join所得。
APP层:根据指标计算需求,对数据进行处理后,存储HBase,为了方便模型查询,主要将表存储为索引表和明细表,直接对数据进行指标计算后,将计算结果存储到HBase。
四、数据监控及校验
1. 数据监控
目前数据的监控的架构是pushgateway + Prometheus + Grafana
数据监控主要是接入Flink的Metric,通过Grafana对Flink系统指标及自定义指标进行图形化界面的展示,对关键指标进行监控报警
2. 数据校验
目前数据的监控的架构是Grafana + Mysql
Grafana用于监控指标的展示及相关阈值数据的报警,Mysql主要用于监控数据的存储
将每个服务的source收到的数据、sink发出的数据,根据表的不同将数据关键字段写入mysql中,通过统计各个阶段各个表中的数据条数,对数据完整性进行监控校验,若出现数据缺时,先查找原因,然后指定时间戳重启服务
五、系统管理
元数据管理
表,字段元数据管理,实时感知元数据的变化,大幅度降低使用数据的成本。
系统配置
对应用启动参数及相关配置参数的管理,对任务进行灵活配置及管理。
血缘管理
主要是梳理实时计算平台中数据依赖关系,以及实时任务的依赖关系,从底层ODS到DWD再到DWS,以及APP层用到哪些数据,将整个链度串联起来。
六、问题及解决方案
1. 数据倾斜
由于要拆分主题,要以table为key对数据进行keyBy,但是由于每个表的数据量相差较大,会出现数据倾斜
解决方案:
加盐,给key加前缀
前缀不能随便加,为了保证同一id的数据在相同的分区中,所以根据id_table进行keyBy
2. 数据重复
任务在进行自动或手动重启时,为了保证数据不丢失,数据会出现重复计算的问题,如果下游只是对数据进行HBase存储的话,由于幂等性,这种重复可以解。但是,如果下游要对数据进行聚合,这样会导致数据被计算多次,影响计算结果的准确性
解决方案:
上游在对数据进行发送时,对kafka procer 进行 exactly once的设置
在对数据统计时进行数据去重
3. 数据延时
由于所处理的数据表的大小不一样,处理大表时,会出现数据延时的问题。
解决方案:
针对大表数据增加并行度
4.数据乱序
由于Flink kafka procer默认是根据hash对数据进行随机分区,kafka consumer在对数据进行消费时,每个分区消费速度不同,这样最终在存储数据时,就会出现乱序即相同的id会出现老数据覆盖新数据的问题
解决方案:
对kafka每个阶段进行自定义分区,将id相同的数据分到同一个分区,保证同一id的数据的有序性
由于整个数据处理过程中可能会出现shuffle,导数数据重新乱序,所以在对数据存储前对数据进行排序
对数据进行排序的关键点时要保证每条数据的唯一性,即要有标记数据先后顺序的字段
5 . 数据唯一标记(很重要)
由于要对数据进行去重或者排序,所以要保证数据的唯一性
解决办法:
使用时间戳不可以,因为数据量很大的情况下,同一时间会处理上百条数据
在最初发出数据的时候,为数据打上标记,使用 partition + offset + idx 的组合来确认数据的唯一性及顺序性
6. 数据可靠性
我们对服务重启或对服务升级时,可能会出现数据的丢失
解决方案:
结合Flink 的checkpoint及savepoint机制保证数据的可靠性
开启Flink的checkpoint机制,服务进行自动重启时,会自动读取上次保存在checkpoint中offset,或者我们指定offset进行数据消费
对服务进行升级时,先将服务的状态保存至savepoint中,重启时指定savepoint进行服务启动,保证数据不丢失
7. 无感升级
由于我们目前数据量比较庞大,且在对服务进行升级时,耗时较长,会影响调用方的使用。
解决办法:
在对服务进行升级时,将数据写入备用库,等数据追上且服务稳定运行后,再将存储库进行切换
❾ flink 读取mysql并使用flink sql
1.mysql连接
2.flink sql
3.dependency
❿ flink sql 知其所以然(十三):流 join问题解决
本节是 flink sql 流 join 系列的下篇,上篇的链接如下:
废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:
书接上文,上文介绍了曝光流在关联点击流时,使用 flink sql regular join 存在的 retract 问题。
本文介绍怎么使用 flink sql interval join 解决这些问题。
flink sql 知其所以然(十二):流 join 很难嘛???(上)
看看上节的实际案例,来看看在具体输入值的场景下,输出值应该长啥样。
场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。
来一波输入数据:
曝光数据:
点击数据:
预期输出数据如下:
上节的 flink sql regular join 解决方案如下:
上节说道,flink sql left join 在流数据到达时,如果左表流(show_log)join 不到右表流(click_log) ,则不会等待右流直接输出(show_log,null),在后续右表流数据代打时,会将(show_log,null)撤回,发送(show_log,click_log)。这就是为什么产生了 retract 流,从而导致重复写入 kafka。
对此,我们也是提出了对应的解决思路,既然 left join 中左流不会等待右流,那么能不能让左流强行等待右流一段时间,实在等不到在数据关联不到的数据即可。
当当当!!!
本文的 flink sql interval join 登场,它就能等。
大家先通过下面这句话和图简单了解一下 interval join 的作用(熟悉 DataStream 的小伙伴萌可能已经使用过了),后续会详细介绍原理。
interval join 就是用一个流的数据去关联另一个流的一段时间区间内的数据。关联到就下发关联到的数据,关联不到且在超时后就根据是否是 outer join(left join,right join,full join)下发没关联到的数据。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>
来看看上述案例的 flink sql interval join sql 怎么写:
这里设置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE 代表 show_log 表中的数据会和 click_log 表中的 row_time 在前后 10 分钟之内的数据进行关联。
运行结果如下:
如上就是我们期望的正确结果了。
flink web ui 算子图如下:
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>
那么此时你可能有一个问题,结果中的前两条数据 join 到了输出我是理解的,那当 show_log join 不到 click_log 时为啥也输出了?原理是啥?
博主带你们来定位到具体的实现源码。先看一下 transformations。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>
可以看到事件时间下 interval join 的具体 operator 是 org.apache.flink.table.runtime.operators.join. 。
其核心逻辑就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 来处理具体 join 逻辑。 RowTimeIntervalJoin 重要方法如下图所示。
TimeIntervalJoin
下面详细给大家解释一下。
join 时,左流和右流会在 interval 时间之内相互等待,如果等到了则输出数据[+(show_log,click_log)],如果等不到,并且另一条流的时间已经推进到当前这条数据在也不可能 join 到另一条流的数据时,则直接输出[+(show_log,null)],[+(null,click_log)]。
举个例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE , 当 click_log 的时间推进到 2021-11-01 11:00:00 时,这时 show_log 来一条 2021-11-01 02:00:00 的数据, 那这条 show_log 必然不可能和 click_log 中的数据 join 到了,因为 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之间的数据以及过期删除了。则 show_log 直接输出 [+(show_log,null)]
以上面案例的 show_log(左表) interval join click_log(右表) 为例(不管是 inner interval join,left interval join,right interval join 还是 full interval join,都会按照下面的流程执行):
上面只是左流 show_log 数据到达时的执行流程(即 ProcessElement1 ),当右流 click_log 到达时也是完全类似的执行流程(即 ProcessElement2 )。
小伙伴萌在使用 interval join 需要注意的两点事项:
本文主要介绍了 flink sql interval 是怎么避免出现 flink regular join 存在的 retract 问题的,并通过解析其实现说明了运行原理,博主期望你读完本文之后能了解到: