当前位置:首页 » 编程语言 » flinksql开源
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

flinksql开源

发布时间: 2023-05-30 16:38:26

1. 数据治理大数据湖仓一体开源框架

数据治坦拍理大数据湖仓一体开源框架分为4部分:

1、数据源

业务库数据、用户日志、系统日贺盯志、爬虫数据

2、构建集群

Hadoop,HDFS,Yarn

3.1 数据采集

数据采集工具:Sqoop、Flume、Canal、Sparkstreaming

3.2 数据预处理

数据预处理工具:消息系统Kafka,宽表工具Sparksql、FlinkSql

3.3数据存储

Hbase数据让拍羡库集群、Clickhouse

3.4数据挖掘

Spark,Flink

4、数据可视化

FineBI 、PowerBI

开发工具:Intellij IDEA

2. Flink:特性、概念、组件栈、架构及原理分析

简单之美 | Apache Flink:特性、概念、组件栈、架构及原理分析
http://shiyanjun.cn/archives/1508.html

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们它们所提供的SLA是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有MapRece、Tez、Crunch、Spark,实现流处理的开源方案有Samza、Storm。Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基于同一个Flink运行时(Flink Runtime),分别提供了流处理和批处理API,而这两种API也是实现上层面向流处理、批处理类型应用框架的基础。
基本特性
关于Flink所支持的特性,我这里只是通过分类的方式简单做一下梳理,涉及到具体的一些概念及其原理会在后面的部分做详细说明。
流处理特性
支持高吞吐、低延迟、高性能的流处理
支持带有事件时间的窗口(Window)操作
支持有状态计算的Exactly-once语义
支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
支持具有Backpressure功能的持续流模型
支持基于轻量级分布式快照(Snapshot)实现的容错
一个运行时同时支持Batch on Streaming处理和Streaming处理
Flink在JVM内部实现了自己的内存管理
支持迭代计算
支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

API支持
对Streaming数据类应用,提供DataStream API
对批处理类应用,提供DataSet API(支持Java/Scala)

Libraries支持
支持机器学习(FlinkML)
支持图分析(Gelly)
支持关系数据处理(Table)
支持复杂事件处理(CEP)

整合支持
支持Flink on YARN
支持HDFS
支持来自Kafka的输入数据
支持Apache HBase
支持Hadoop程序
支持Tachyon
支持ElasticSearch
支持RabbitMQ
支持Apache Storm
支持S3
支持XtreemFS

基本概念
Stream & Transformation & Operator
用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。下面是一个由Flink程序映射为Streaming Dataflow的示意图,如下所示:

比如从Source[1]到map()[1],它保持了Source的分区特性(Partitioning)和分区内元素处理的有序性,也就是说map()[1]的Subtask看到数据流中记录的顺序,与Source[1]中看到的记录顺序是一致的。
Redistribution模式

这种模式改变了输入数据流的分区,比如从map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多个不同的Subtask发送数据,改变了数据流的分区,这与实际应用所选择的Operator有关系。另外,Source Operator对应2个Subtask,所以并行度为2,而Sink Operator的Subtask只有1个,故而并行度为1。
Task & Operator Chain
在Flink分布式执行环境中,会将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行,如下图所示:

在Flink集群启动的时候,TaskManager会向JobManager注册,如果注册成功,则JobManager会向TaskManager回复消息AcknowledgeRegistration。
SubmitJob

Flink程序内部通过Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。
CancelJob

请求取消一个Flink Job的执行,CancelJob消息中包含了Job的ID,如果成功则返回消息CancellationSuccess,失败则返回消息CancellationFailure。
UpdateTaskExecutionState

TaskManager会向JobManager请求更新ExecutionGraph中的ExecutionVertex的状态信息,更新成功则返回true。
RequestNextInputSplit

运行在TaskManager上面的Task,请求获取下一个要处理的输入Split,成功则返回NextInputSplit。
JobStatusChanged

ExecutionGraph向JobManager发送该消息,用来表示Flink Job的状态发生的变化,例如:RUNNING、CANCELING、FINISHED等。
TaskManager
TaskManager也是一个Actor,它是实际负责执行计算的Worker,在其上执行Flink Job的一组Task。每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。TaskManager端可以分成两个阶段:
注册阶段

TaskManager会向JobManager注册,发送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然后TaskManager就可以进行初始化过程。
可操作阶段

该阶段TaskManager可以接收并处理与Task有关的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager无法连接到JobManager,这是TaskManager就失去了与JobManager的联系,会自动进入“注册阶段”,只有完成注册才能继续处理Task相关的消息。
Client
当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。Client会将用户提交的Flink程序组装一个JobGraph, 并且是以JobGraph的形式提交的。一个JobGraph是一个Flink Dataflow,它由多个JobVertex组成的DAG。其中,一个JobGraph包含了一个Flink程序的如下信息:JobID、Job名称、配置信息、一组JobVertex等。
组件栈
Flink是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。Flink分层的组件栈如下图所示:

了解YARN的话,对上图的原理非常熟悉,实际Flink也实现了满足在YARN集群上运行的各个组件:Flink YARN Client负责与YARN RM通信协商资源请求,Flink JobManager和Flink TaskManager分别申请到Container去运行各自的进程。通过上图可以看到,YARN AM与Flink JobManager在同一个Container中,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,并进行后续的映射、调度和计算处理。
Runtime层

Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。
API层

API层主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API。
Libraries层

该层也可以称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。
内部原理
容错机制
Flink基于Checkpoint机制实现容错,它的原理是不断地生成分布式Streaming数据流Snapshot。在流处理失败时,通过这些Snapshot可以恢复数据流处理。理解Flink的容错机制,首先需要了解一下Barrier这个概念:Stream Barrier是Flink分布式Snapshotting中的核心元素,它会作为数据流的记录被同等看待,被插入到数据流中,将数据流中记录的进行分组,并沿着数据流的方向向前推进。每个Barrier会携带一个Snapshot ID,属于该Snapshot的记录会被推向该Barrier的前方。因为Barrier非常轻量,所以并不会中断数据流。带有Barrier的数据流,如下图所示:

接收到Barrier n的Stream被临时搁置,来自这些Stream的记录不会被处理,而是被放在一个Buffer中
一旦最后一个Stream接收到Barrier n,Operator会emit所有暂存在Buffer中的记录,然后向Checkpoint Coordinator发送Snapshot n
继续处理来自多个Stream的记录

基于Stream Aligning操作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐Barrier的一个Stream为处理Buffer中缓存记录的时刻点。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。
调度机制
在JobManager端,会接收到Client提交的JobGraph形式的Flink Job,JobManager会将一个JobGraph转换映射为一个ExecutionGraph,如下图所示:

迭代机制
机器学习和图计算应用,都会使用到迭代计算,Flink通过在迭代Operator中定义Step函数来实现迭代算法,这种迭代算法包括Iterate和Delta Iterate两种类型,在实现上它们反复地在当前迭代状态上调用Step函数,直到满足给定的条件才会停止迭代。下面,对Iterate和Delta Iterate两种类型的迭代算法原理进行说明:
Iterate

Iterate Operator是一种简单的迭代形式:每一轮迭代,Step函数的输入或者是输入的整个数据集,或者是上一轮迭代的结果,通过该轮迭代计算出下一轮计算所需要的输入(也称为Next Partial Solution),满足迭代的终止条件后,会输出最终迭代结果,具体执行流程如下图所示:

Delta Iterate Operator实现了增量迭代,它的实现原理如下图所示:

另外,Flink还提供了3个参数来配置Backpressure监控行为:
参数名称
默认值
说明

jobmanager.web.backpressure.refresh-interval
60000
默认1分钟,表示采样统计结果刷新时间间隔

jobmanager.web.backpressure.num-samples
100
评估Backpressure状态,所使用的堆栈跟踪调用次数

jobmanager.web.backpressure.delay-between-samples
50
默认50毫秒,表示对一个Job的每个Task依次调用的时间间隔

通过上面个定义的Backpressure状态,以及调整相应的参数,可以确定当前运行的Job的状态是否正常,并且保证不影响JobManager提供服务。
参考链接
http://flink.apache.org/
http://flink.apache.org/features.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/general_arch.html
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheling.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/libs/cep.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/gelly.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/ml/index.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/table.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
http://geek.csdn.net/news/detail/56272
http://samza.apache.org/

3. 开源的大数据框架有哪些

文件存储:Hadoop HDFS、Tachyon、KFS离线计算:Hadoop MapRece、Spark流式、实时计算:Storm、Spark Streaming、S4、HeronK-V、NOSQL数据库:HBase、Redis、MongoDB资源管念埋理:YARN、Mesos日志收集:Flume、Scribe、Logstash、Kibana消息系统:Kafka、StormMQ、ZeroMQ、RabbitMQ查询分析:Hive、Impala、Pig、Presto、Phoenix、SparkSQL、Drill、Flink、Kylin、祥竖Druid分布式协调服务:Zookeeper集谨高大群管理与监控:Ambari、Ganglia、Nagios、Cloudera

4. 为什么Flink会成为下一代大数据处理框架的标准

作者:张利兵

如需转载请联系华章 科技

在当前数据量激增传统的时代,不同的业务场景都有大量的业务数据产生, 对于这些不断产生的数据应该如何进行有效地处理,成为当下大多数公司所面临的问题。

随着雅虎对Hadoop的开源,越来越多的大数据处理技术开始涌入人们的视线,例如目前比较流行大数据处理引擎Apache Spark,基本上已经取代了MapRece成为当前大数据处理的标准。

但随着数据的不断增长,新技术的不断发展,人们逐渐意识到对实时数据处理的重要性,企业需要能够同时支持高吞吐、低延迟、高性能的流处理技术来处理日益增长的数据。

相对于传统的数据处理模式,流式数据处理则有着更高的处理效率和成本控制。 Apache Flink就是近年来在开源社区发展不断发展的能够支持同时支持高吞吐、低延迟、高性能分布式处理框架。

在2010至2014年间,由柏林工业大学,柏林洪堡大学和哈索普拉特纳研究所联合发起名为“Stratosphere: Information Management on the Cloud”研究项目,该项目在当时的社区逐渐具有一定社区知名度,2014年4月,Stratosphere代码被贡献给Apache 软件基金会,成为Apache基金会孵化器项目。

期初参与该项目的核心成员均来自Stratosphere原来的核心成员,之后团队的大部分创始成员离开学校,共同创办了一家名叫Data Artisans的公司,其主要业务便是将Stratosphere,也就是之后的Flink实现商业化。在项目孵化期间,项目Stratosphere改名为Flink。

Flink在德语中是快速和灵敏的意思,用来体现流式数据处理器的速度快和灵活性强等特点,同时使用棕红色松鼠图案作为Flink项目的Logo,也是主要借助于松鼠灵活快速的特点,由此Flink开始正式地进入社区开发者的视线。

在2014年12月,该项目成为Apache 软件基金会顶级项目,从2015年09月发布第一个稳定版本0.9,到2019年4月已经发布到1.8的版本,更多的社区开发成员也逐步地加入,现在Flink在全球范轮前围内拥有燃棚350多位的开发人员, 不断有新的特性被发布。

同时在全球范围内,越来越多的公司开始使用Flink,在国内比较出名的互联网公司如Alibaba,美团,滴滴等,都在大规模的使用Flink作为企业的分布式大数据处理引擎。

Flink在近年来逐步被人们所熟知和使用,其主要原因不仅因为提供同时支持高吞吐、低延迟和exactly-once语义的实时计算能力,同时Flink还提供了基于流式计算引擎处理批量数据的计算能力,真正意义实现了批流统一,同时随着Alibaba对Blink的开源,极大地增强了Flink对批计算领域的支持。

众多优秀的特性,使得Flink成为开源大数据数据处理框架中的一颗新星,随着国内社区不断推动, 越来越多的国内公司开始选择使用Flink作为实时数据处理的技术 ,在将来不久的时间内,Flink也将会成为企业内部主流的数据处理框架,最终成为下一代大数据数据处理框架的标准。

有状态流计算将会随着技术的发展,逐步成为企业作为构建数据平台的架构模式,而这种技术实现的开源方案目前从社区来看,能够满足的就是Apache Flink。 Flink通过实现Google Dataflow流式计算模型实现了高吞吐,低延迟,高性能兼具实时流式计算框架。

同时Flink支持高效容错的状态管理,Flink能够将其状态维护在内存或RockDB数据库中,为了防止状态在计算过程中因为系统异常而出现丢失,Flink周期性的通过分布式快照技术CheckPoints实现状态的持久化维护,使得在系统即使在停机或者异常的情况下都能正确的进行状态恢复,从而保证皮桐则在任何时间都能计算出正确的结果。

数据架构的演变过程,伴随着技术的不断迭代更新,Flink具有先进的架构理念,以及诸多的优秀特性,以及完善的编程接口,而Flink也在每一次的Release版本中,不断推出新的特性。

例如Queryable State功能的提出,将直接容许用户通过远程的方式直接获取流式计算任务的状态信息,也就是说数据不需要落地数据库就能直接从流式应用中直接查询出,对于实时交互式的查询业务可以直接从Flink的状态中查询最新的结果,当然这个功能目前还属于Beta版本,但是相信在不久的未来,会变得越来越完善,那时Flink将不仅作为实时流式处理的框架,更多的可能会成为一套实时的存储引擎, 会让更多的用户从有状态计算的技术中获取收益。

Flink是一套集高吞吐,低延迟,高性能三者于一身的分布式流式数据处理框架。

非常成熟的计算框架Apache Spark也只能兼顾高吞吐和高性能特性,在Spark Streaming流式计算中无法做到低延迟保障;而Apache Storm只能支持低延迟和高性能特性,但是无法满足高吞吐的要求。而对于满足高吞吐,低延迟,高性能这三个目标对分布式流式计算框架是非常重要的。

在流式计算领域中,窗口计算的地位举足轻重,但目前大多数计算框架窗口计算所采用的都是系统时间(Process Time),也是事件传输到计算框架处理时,系统主机的当前时间,Flink能够支持基于 事件时间 (Event Time)语义的进行窗口计算,就是使用事件产生的时间,这种时间机制使得事件即使无序到达甚至延迟到达,数据流都能够计算出精确的结果,同时保持了事件原本产生时的在时间维度的特点,而不受网络传输或者计算框架的影响。

Flink在1.4版本中实现了状态管理,所谓状态就是在流式计算过程中将算子的中间结果数据的保存在内存或者DB中,等下一个事件进入接着从状态中获取中间结果进行计算,从而无需基于全部的原始数据统计结果,这种做法极大地提升了系统的性能,同时也降低了计算过程的耗时。

对于数据量非常大且逻辑运算非常复杂的流式运算,基于状态的流式计算则显得非常使用。

在流处理应用中,数据是连续不断的,需要通过窗口的方式对流数据进行一定范围的聚合计算,例如统计在过去的1分钟内有多少用户点击了某一网页,在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据再进行计算。

Flink将窗口划分为基于Time、Count、Session,以及Data-driven等类型的窗口操作,窗口能够用灵活的触发条件定制化从而达到对复杂的流传输模式的支持,不同的窗口操作应用能够反馈出真实事件产生的情况,用户可以定义不同的窗口触发机制来满足不同的需求。

Flink能够分布式运行在上千个节点之上,将一个大型计算的流程拆解成小的计算过程,然后将计算过程分布到单台并行节点上进行处理。

在任务执行过程中,能够自动的发现事件处理过程中的错误而导致数据不一致的问题,常见的错误类型例如:节点宕机,或者网路传输问题,或是由于用户因为升级或修复问题而导致计算服务重启等。

在这些情况下,通过基于分布式快照技术的 Checkpoints ,将执行过程中的任务信息进行持久化存储,一旦任务出现异常宕机,Flink能够进行任务的自动恢复,从而确保数据在处理过程中的一致性。

内存管理是每套计算框架需要重点考虑的领域,尤其对于计算量比较大的计算场景,数据在内存中该如何进行管理,针对内存管理这块,Flink实现了自身管理内存的机制,尽可能减少Full GC对系统的影响。

另外通过自定义序列化/反序列化方法将所有的对象转换成二进制在内存中存储,降低数据存储的大小,更加有效的对内存空间进行利用,降低GC所带来的性能下降或者任务停止的风险,同时提升了分布式处理过数据传输的性能。

因此Flink较其他分布式处理的框架则会显得更加稳定,不会因为JVM GC等问题而导致整个应用宕机的问题。

对于7*24小时运行的流式应用,数据源源不断的接入,在一段时间内应用的终止都有可能导致数据的丢失或者计算结果的不准确性,例如进行版本的升级,停机运维操作等,都能导致这种情况发生。

然而值得一提的是Flink通过其Save Points技术能够将任务执行的快照(Snapshot)保存在存储介质上,等待任务重启的时候可以直接从实现保存的Save Points恢复原有的计算状态,使得任务继续按照停机之前的状态继续运行,Save Points技术可以让用户更好的管理和运维实时流式应用。

同时Flink除了上述的特性之外也具有其他非常优秀的特性,可以让用户有更多选择。 Flink具备非常多的优秀特性,这不仅让Flink在社区的知名度越来越高,也吸引了众多的企业参与研发和使用Flink这项技术。

关于作者:张利兵,资深架构师,流式计算领域专家,第四范式华东区AI项目架构师,原明略数据华东区大数据架构师。有多年大数据、流式计算方面的开发经验,对Hadoop、Spark、Flink等大数据计算引擎有着非常深入的理解,积累了丰富的项目实践经验。

推荐语: 从功能、原理、实战和调优4个维度循序渐进讲解利用Flink进行分布式流式应用开发,指导读者从零基础入门到进阶。适读人群:流计算开发工程师、大数据架构工程师、大数据开发工程师、数据挖掘工程师、高校研究生以及高年级本科生。

5. 什么是大数据的主流框架

市场上有许多可用的框架。其中一些更受欢迎,例如Spark,Hadoop,Hive和Storm。Presto在效用指数上得分很高,而Flink具有巨大的潜力。
1. Apache Hadoop
Hadoop是基于Java的平台。这是一个开放源代码框架,可跨集群排列的一组硬件机器提供批处理数据处理和数据存储服务。Hadoop同样适用于可靠,可扩展和分布式的计算。但是,它也可以用作通用文件存储。它可以存储和处理PB的信息。Hadoop由三个主要组件组成。
2. Apache Spark
Spark框架由加利福尼亚大学伯克利分校成立。它是具有改进的数据流处理的批处理框架。借助完整的内存计算以及处理优化,它保证了极其快速的集群计算系统。
3.Apache Storm
Apache Storm是另一个引人注目的解决方案,专注于处理巨大的实时数据流。Storm的主要亮点是可伸缩性和停机后的迅速恢复能力。
4. Apache Flink
Apache Flink是一个开源框架,同样适用于批处理和流数据处理。它最适合于集群环境。该框架基于转换–流概念。它也是大数据的4G。它比Hadoop – Map Rece快100倍。
5. Presto
Presto是最适合较小数据集的开源分布式SQL工具。Presto配备了协调员以及各种工人。当客户提交查询时,将对这些查询进行解析,分析,计划执行并分配给协调员在工作人员之间进行处理。
6. Samza
Apache Samza是有状态的流,准备与Kafka共同开发的大数据系统。Kafka提供数据服务,缓冲和容错能力。

6. Flink SQL 知其所以然(五)| 自定义 protobuf format

protobuf 作为目前各大公司中最广泛使用的高效的协议数据交换格式工具库,会大量作为流式数据传输的序列化方式,所以在 flink sql 中如果能实现 protobuf 的 format 会非常有用( 目前社区已经有对应的实现,不过目前还没有 merge,预计在 1.14 系列版本中能 release )。

issue 见: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC

pr 见: https://github.com/apache/flink/pull/14376

这一节主要介绍 flink sql 中怎么自定义实现 format ,其中以最常使用的 protobuf 作为案例来介绍。

如果想在本地直接测试下:

关于为什么选择 protobuf 可以看这篇文章,写的很详细:

http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral

在实时计算的领域中,为了可读性会选择 json ,为了效率以及一些已经依赖了 grpc 的公司会选择 protobuf 来做数据序列化,那么自然而然,日志的序列化方式也会选择 protobuf 。

而官方目前已经 release 的版本中是没有提供 flink sql api 的 protobuf format 的。如下图,基于 1.13 版本。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

因此本文在介绍怎样自定义一个 format 的同时,实现一个 protobuf format 来给大家使用。

预期效果是先实现几种最基本的数据类型,包括 protobuf 中的 message (自定义 model)、 map (映射)、 repeated (列表)、其他基本数据类型等,这些都是我们最常使用的类型。

预期 protobuf message 定义如下:

测试数据源数据如下,博主把 protobuf 的数据转换为 json,以方便展示,如下图:

预期 flink sql:

数据源表 DDL:

数据汇表 DDL:

Transform 执行逻辑:

下面是我在本地跑的结果:

可以看到打印的结果,数据是正确的被反序列化读入,并且最终输出到 console。

目前业界可以参考的实现如下: https://github.com/maosuhan/flink-pb , 也就是这位哥们负责目前 flink protobuf 的 format。

这种实现的具体使用方式如下:

其实现有几个特点:

[图片上传失败...(image-66c35b-1644940704671)]

其实上节已经详细描述了 flink sql 对于 sourcesinkformat 的加载机制。

如图 serde format 是通过 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 创建的

所有通过 SPI 的 sourcesinkformt 插件都继承自 Factory 。

整体创建 format 方法的调用链如下图。

最终实现如下,涉及到了几个实现类:

具体流程:

上述实现类的具体关系如下:

介绍完流程,进入具体实现方案细节:

ProtobufFormatFactory 主要创建 format 的逻辑:

resourcesMETA-INF 文件:

主要实现反序列化的逻辑:

可以注意到上述反序列化的主要逻辑就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定义的。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 其实就是一个 convertor 接口:

其作用就是将 protobuf message 中的每一个字段转换成为 RowData 中的每一个字段。

ProtobufToRowDataConverters 中就定义了具体转换逻辑,如截图所示,每一个 LogicalType 都定义了 protobuf message 字段转换为 flink 数据类型的逻辑:

源码后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取。

本文主要是针对 flink sql protobuf format 进行了原理解释以及对应的实现。
如果你正好需要这么一个 format,直接后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取源码吧。

当然上述只是 protobuf format 一个基础的实现,用于生产环境还有很多方面可以去扩展的。

7. flinksql-core-动态表

普通动态表是FlinkSQL中的一类表,表中的数据与连接的外部数据对等,可以简单理解为把一张mysql的表放进flink内存中得到的表,并且该表与mysql表有连接关系,即该表可以读写mysql表。

需要声明表的字段定义和表属性(连接器属性)。语法如下:

with关键字前面的是字段定义,with关键字后面的是表属性。其中字段定义时还可以声明表主键,声明语法为PARIMARY KEY(myColumn1,...) NOT ENFORCED, 这里的not enforced表示flinksql不会对主键做强制的唯一性约束、非空约束,而且目前flinksql中只支持这种类型的主键。
表属性中有若干个属性字段需要声明,具体有哪些属性字段取决于使用哪个连接器,如上述声明中使用的是jdbc连接器,在使用该连接器时需要提供url、username、password等属性,通过此连接器我们就可以让该表能连接到对应的mysql表。

我们可以查询flinksql普通动态表的数据,此数据与连接的外部数据是一致的。语法如下:

tips:在运行时,只会加载一次外部数据到flinksql普通动态表。后续外部数据表有更新时,flinksql的普通动态表不会跟着自动更新。

我们可以把数据写入到flinksql动态表,从而实现写入数据到外部系统的目的。语法如下:

8. Flink SQL实战演练之自定义Clickhouse Connector

简介:实时数仓目前的架构是flink+clickhouse,社区目前jdbc connector不支持clickhouse的方言,所以决定自定义clickhouse connector实现flink sql写入数据到clickhouse。

目前想要实现flink sql数据落地到ck,可以修改jdbc connector的源码,增加ck方言,或者采用阿里提供的ck connector包,为了更好的理解flink connector的原理,这里自定义connector实现。

目前支持Flink写入Clickhouse的依赖哭比较多,如果数据格式固定,可以CSV的方式写入,如果不固定,可采用Json的方式写入。

9. 转载:阿里巴巴为什么选择Apache Flink

本文主要整理自阿里巴巴计算平台事业部资深技术专家莫问在云栖大会的演讲。

合抱之木,生于毫末

随着人工智能时代的降临,数据量的爆发,在典型的大数据的业务场景下数据业务最通用的做法是:选用批处理的技术处理全量数据,采用流式计算处理实时增量数据。在绝大多数的业务场景之下,用户的业务逻辑在批处理和流处理之中往往是相同的。但是,用户用于批处理和流处理的两套计算引擎是不同的。

因此,用户通常需要写两套代码。毫无疑问,这带来了一些额外的负担和成本。阿里巴巴的商品数据处理就经常需要面对增量和全量两套不同的业务流程问题,所以阿里就在想,我们能不能有一套统一的大数据引擎技术,用户只需要根据自己的业务逻辑开发一套代码。这样在各种不同的场景下,不管是全量数据还是增量数据,亦或者实时处理,一套方案即可全部支持, 这就是阿里选择Flink的背景和初衷

目前开源大数据计算引擎有很多选择,流计算如Storm,Samza,Flink,Kafka Stream等,批处理如Spark,Hive,Pig,Flink等。而同时支持流处理和批处理的计算引擎,只有两种选择:一个是Apache Spark,一个是Apache Flink。

从技术,生态等各方面的综合考虑。首先,Spark的技术理念是基于批来模拟流的计算。而Flink则完全相反,它采用的是基于流计算来模拟批计算。

从技术发展方向看,用批来模拟流有一定的技术局限性,并且这个局限性可能很难突破。而Flink基于流来模拟批,在技术上有更好的扩展性。从长远来看,阿里决定用Flink做一个统一的、通用的大数据引擎作为未来的选型。

Flink是一个低延迟、高吞吐、统一的大数据计算引擎。在阿里巴巴的生产环境中,Flink的计算平台可以实现毫秒级的延迟情况下,每秒钟处理上亿次的消息或者事件。同时Flink提供了一个Exactly-once的一致性语义。保证了数据的正确性。这样就使得Flink大数据引擎可以提供金融级的数据处理能力。

Flink在阿里的现状

基于Apache Flink在阿里巴巴搭建的平台于2016年正式上线,并从阿里巴巴的搜索和推荐这两大场景开始实现。目前阿里巴巴所有的业务,包括阿里巴巴所有子公司都采用了基于Flink搭建的实时计算平台。同时Flink计算平台运行在开源的Hadoop集群之上。采用Hadoop的YARN做为资源管理调度,以 HDFS作为数据存储。因此,Flink可以和开源大数据软件Hadoop无缝对接。

目前,这套基于Flink搭建的实时计算平台不仅服务于阿里巴巴集团内部,而且通过阿里云的云产品API向整个开发者生态提供基于Flink的云产品支持。

Flink在阿里巴巴的大规模应用,表现如何?

规模: 一个系统是否成熟,规模是重要指标,Flink最初上线阿里巴巴只有数百台服务器,目前规模已达上万台,此等规模在全球范围内也是屈指可数;

状态数据: 基于Flink,内部积累起来的状态数据已经是PB级别规模;

Events: 如今每天在Flink的计算平台上,处理的数据已经超过万亿条;

PS: 在峰值期间可以承担每秒超过4.72亿次的访问,最典型的应用场景是阿里巴巴双11大屏;

Flink的发展之路

接下来从开源技术的角度,来谈一谈Apache Flink是如何诞生的,它是如何成长的?以及在成长的这个关键的时间点阿里是如何进入的?并对它做出了那些贡献和支持?

Flink诞生于欧洲的一个大数据研究项目StratoSphere。该项目是柏林工业大学的一个研究性项目。早期,Flink是做Batch计算的,但是在2014年,StratoSphere里面的核心成员孵化出Flink,同年将Flink捐赠Apache,并在后来成为Apache的顶级大数据项目,同时Flink计算的主流方向被定位为Streaming,即用流式计算来做所有大数据的计算,这就是Flink技术诞生的背景。

2014年Flink作为主攻流计算的大数据引擎开始在开源大数据行业内崭露头角。区别于Storm,Spark Streaming以及其他流式计算引擎的是:它不仅是一个高吞吐、低延迟的计算引擎,同时还提供很多高级的功能。比如它提供了有状态的计算,支持状态管理,支持强一致性的数据语义以及支持Event Time,WaterMark对消息乱序的处理。

Flink核心概念以及基本理念

Flink最区别于其他流计算引擎的,其实就是状态管理。

什么是状态?例如开发一套流计算的系统或者任务做数据处理,可能经常要对数据进行统计,如Sum,Count,Min,Max,这些值是需要存储的。因为要不断更新,这些值或者变量就可以理解为一种状态。如果数据源是在读取Kafka,RocketMQ,可能要记录读取到什么位置,并记录Offset,这些Offset变量都是要计算的状态。

Flink提供了内置的状态管理,可以把这些状态存储在Flink内部,而不需要把它存储在外部系统。这样做的好处是第一降低了计算引擎对外部系统的依赖以及部署,使运维更加简单;第二,对性能带来了极大的提升:如果通过外部去访问,如Redis,HBase它一定是通过网络及RPC。如果通过Flink内部去访问,它只通过自身的进程去访问这些变量。同时Flink会定期将这些状态做Checkpoint持久化,把Checkpoint存储到一个分布式的持久化系统中,比如HDFS。这样的话,当Flink的任务出现任何故障时,它都会从最近的一次Checkpoint将整个流的状态进行恢复,然后继续运行它的流处理。对用户没有任何数据上的影响。

Flink是如何做到在Checkpoint恢复过程中没有任何数据的丢失和数据的冗余?来保证精准计算的?

这其中原因是Flink利用了一套非常经典的Chandy-Lamport算法,它的核心思想是把这个流计算看成一个流式的拓扑,定期从这个拓扑的头部Source点开始插入特殊的Barries,从上游开始不断的向下游广播这个Barries。每一个节点收到所有的Barries,会将State做一次Snapshot,当每个节点都做完Snapshot之后,整个拓扑就算完整的做完了一次Checkpoint。接下来不管出现任何故障,都会从最近的Checkpoint进行恢复。

Flink利用这套经典的算法,保证了强一致性的语义。这也是Flink与其他无状态流计算引擎的核心区别。

下面介绍Flink是如何解决乱序问题的。比如星球大战的播放顺序,如果按照上映的时间观看,可能会发现故事在跳跃。

在流计算中,与这个例子是非常类似的。所有消息到来的时间,和它真正发生在源头,在线系统Log当中的时间是不一致的。在流处理当中,希望是按消息真正发生在源头的顺序进行处理,不希望是真正到达程序里的时间来处理。Flink提供了Event Time和WaterMark的一些先进技术来解决乱序的问题。使得用户可以有序的处理这个消息。这是Flink一个很重要的特点。

接下来要介绍的是Flink启动时的核心理念和核心概念,这是Flink发展的第一个阶段;第二个阶段时间是2015年和2017年,这个阶段也是Flink发展以及阿里巴巴介入的时间。故事源于2015年年中,我们在搜索事业部的一次调研。当时阿里有自己的批处理技术和流计算技术,有自研的,也有开源的。但是,为了思考下一代大数据引擎的方向以及未来趋势,我们做了很多新技术的调研。

结合大量调研结果,我们最后得出的结论是:解决通用大数据计算需求,批流融合的计算引擎,才是大数据技术的发展方向,并且最终我们选择了Flink。

但2015年的Flink还不够成熟,不管是规模还是稳定性尚未经历实践。最后我们决定在阿里内部建立一个Flink分支,对Flink做大量的修改和完善,让其适应阿里巴巴这种超大规模的业务场景。在这个过程当中,我们团队不仅对Flink在性能和稳定性上做出了很多改进和优化,同时在核心架构和功能上也进行了大量创新和改进,并将其贡献给社区,例如:Flink新的分布式架构,增量Checkpoint机制,基于Credit-based的网络流控机制和Streaming SQL等。

阿里巴巴对Flink社区的贡献

我们举两个设计案例,第一个是阿里巴巴重构了Flink的分布式架构,将Flink的Job调度和资源管理做了一个清晰的分层和解耦。这样做的首要好处是Flink可以原生的跑在各种不同的开源资源管理器上。经过这套分布式架构的改进,Flink可以原生地跑在Hadoop Yarn和Kubernetes这两个最常见的资源管理系统之上。同时将Flink的任务调度从集中式调度改为了分布式调度,这样Flink就可以支持更大规模的集群,以及得到更好的资源隔离。

另一个是实现了增量的Checkpoint机制,因为Flink提供了有状态的计算和定期的Checkpoint机制,如果内部的数据越来越多,不停地做Checkpoint,Checkpoint会越来越大,最后可能导致做不出来。提供了增量的Checkpoint后,Flink会自动地发现哪些数据是增量变化,哪些数据是被修改了。同时只将这些修改的数据进行持久化。这样Checkpoint不会随着时间的运行而越来越难做,整个系统的性能会非常地平稳,这也是我们贡献给社区的一个很重大的特性。

经过2015年到2017年对Flink Streaming的能力完善,Flink社区也逐渐成熟起来。Flink也成为在Streaming领域最主流的计算引擎。因为Flink最早期想做一个流批统一的大数据引擎,2018年已经启动这项工作,为了实现这个目标,阿里巴巴提出了新的统一API架构,统一SQL解决方案,同时流计算的各种功能得到完善后,我们认为批计算也需要各种各样的完善。无论在任务调度层,还是在数据Shuffle层,在容错性,易用性上,都需要完善很多工作。

篇幅原因,下面主要和大家分享两点:

● 统一 API Stack

● 统一 SQL方案

先来看下目前Flink API Stack的一个现状,调研过Flink或者使用过Flink的开发者应该知道。Flink有2套基础的API,一套是DataStream,一套是DataSet。DataStream API是针对流式处理的用户提供,DataSet API是针对批处理用户提供,但是这两套API的执行路径是完全不一样的,甚至需要生成不同的Task去执行。所以这跟得到统一的API是有冲突的,而且这个也是不完善的,不是最终的解法。在Runtime之上首先是要有一个批流统一融合的基础API层,我们希望可以统一API层。

因此,我们在新架构中将采用一个DAG(有限无环图)API,作为一个批流统一的API层。对于这个有限无环图,批计算和流计算不需要泾渭分明的表达出来。只需要让开发者在不同的节点,不同的边上定义不同的属性,来规划数据是流属性还是批属性。整个拓扑是可以融合批流统一的语义表达,整个计算无需区分是流计算还是批计算,只需要表达自己的需求。有了这套API后,Flink的API Stack将得到统一。

除了统一的基础API层和统一的API Stack外,同样在上层统一SQL的解决方案。流和批的SQL,可以认为流计算有数据源,批计算也有数据源,我们可以将这两种源都模拟成数据表。可以认为流数据的数据源是一张不断更新的数据表,对于批处理的数据源可以认为是一张相对静止的表,没有更新的数据表。整个数据处理可以当做SQL的一个Query,最终产生的结果也可以模拟成一个结果表。

对于流计算而言,它的结果表是一张不断更新的结果表。对于批处理而言,它的结果表是相当于一次更新完成的结果表。从整个SOL语义上表达,流和批是可以统一的。此外,不管是流式SQL,还是批处理SQL,都可以用同一个Query来表达复用。这样以来流批都可以用同一个Query优化或者解析。甚至很多流和批的算子都是可以复用的。

Flink的未来方向

首先,阿里巴巴还是要立足于Flink的本质,去做一个全能的统一大数据计算引擎。将它在生态和场景上进行落地。目前Flink已经是一个主流的流计算引擎,很多互联网公司已经达成了共识:Flink是大数据的未来,是最好的流计算引擎。下一步很重要的工作是让Flink在批计算上有所突破。在更多的场景下落地,成为一种主流的批计算引擎。然后进一步在流和批之间进行无缝的切换,流和批的界限越来越模糊。用Flink,在一个计算中,既可以有流计算,又可以有批计算。

第二个方向就是Flink的生态上有更多语言的支持,不仅仅是Java,Scala语言,甚至是机器学习下用的Python,Go语言。未来我们希望能用更多丰富的语言来开发Flink计算的任务,来描述计算逻辑,并和更多的生态进行对接。

最后不得不说AI,因为现在很多大数据计算的需求和数据量都是在支持很火爆的AI场景,所以在Flink流批生态完善的基础上,将继续往上走,完善上层Flink的Machine Learning算法库,同时Flink往上层也会向成熟的机器学习,深度学习去集成。比如可以做Tensorflow On Flink, 让大数据的ETL数据处理和机器学习的Feature计算和特征计算,训练的计算等进行集成,让开发者能够同时享受到多种生态给大家带来的好处。

10. flink sql 知其所以然(十三):流 join问题解决

本节是 flink sql 流 join 系列的下篇,上篇的链接如下:

废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:

书接上文,上文介绍了曝光流在关联点击流时,使用 flink sql regular join 存在的 retract 问题。

本文介绍怎么使用 flink sql interval join 解决这些问题。

flink sql 知其所以然(十二):流 join 很难嘛???(上)

看看上节的实际案例,来看看在具体输入值的场景下,输出值应该长啥样。

场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。

来一波输入数据:

曝光数据:

点击数据:

预期输出数据如下:

上节的 flink sql regular join 解决方案如下:

上节说道,flink sql left join 在流数据到达时,如果左表流(show_log)join 不到右表流(click_log) ,则不会等待右流直接输出(show_log,null),在后续右表流数据代打时,会将(show_log,null)撤回,发送(show_log,click_log)。这就是为什么产生了 retract 流,从而导致重复写入 kafka。

对此,我们也是提出了对应的解决思路,既然 left join 中左流不会等待右流,那么能不能让左流强行等待右流一段时间,实在等不到在数据关联不到的数据即可。

当当当!!!

本文的 flink sql interval join 登场,它就能等。

大家先通过下面这句话和图简单了解一下 interval join 的作用(熟悉 DataStream 的小伙伴萌可能已经使用过了),后续会详细介绍原理。

interval join 就是用一个流的数据去关联另一个流的一段时间区间内的数据。关联到就下发关联到的数据,关联不到且在超时后就根据是否是 outer join(left join,right join,full join)下发没关联到的数据。

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>

来看看上述案例的 flink sql interval join sql 怎么写:

这里设置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE 代表 show_log 表中的数据会和 click_log 表中的 row_time 在前后 10 分钟之内的数据进行关联。

运行结果如下:

如上就是我们期望的正确结果了。

flink web ui 算子图如下:

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>

那么此时你可能有一个问题,结果中的前两条数据 join 到了输出我是理解的,那当 show_log join 不到 click_log 时为啥也输出了?原理是啥?

博主带你们来定位到具体的实现源码。先看一下 transformations。

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>

可以看到事件时间下 interval join 的具体 operator 是 org.apache.flink.table.runtime.operators.join. 。

其核心逻辑就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 来处理具体 join 逻辑。 RowTimeIntervalJoin 重要方法如下图所示。

TimeIntervalJoin

下面详细给大家解释一下。

join 时,左流和右流会在 interval 时间之内相互等待,如果等到了则输出数据[+(show_log,click_log)],如果等不到,并且另一条流的时间已经推进到当前这条数据在也不可能 join 到另一条流的数据时,则直接输出[+(show_log,null)],[+(null,click_log)]。

举个例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE , 当 click_log 的时间推进到 2021-11-01 11:00:00 时,这时 show_log 来一条 2021-11-01 02:00:00 的数据, 那这条 show_log 必然不可能和 click_log 中的数据 join 到了,因为 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之间的数据以及过期删除了。则 show_log 直接输出 [+(show_log,null)]

以上面案例的 show_log(左表) interval join click_log(右表) 为例(不管是 inner interval join,left interval join,right interval join 还是 full interval join,都会按照下面的流程执行):

上面只是左流 show_log 数据到达时的执行流程(即 ProcessElement1 ),当右流 click_log 到达时也是完全类似的执行流程(即 ProcessElement2 )。

小伙伴萌在使用 interval join 需要注意的两点事项:

本文主要介绍了 flink sql interval 是怎么避免出现 flink regular join 存在的 retract 问题的,并通过解析其实现说明了运行原理,博主期望你读完本文之后能了解到: