当前位置:首页 » 编程语言 » flinksql数据类型转换
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

flinksql数据类型转换

发布时间: 2023-04-13 20:25:18

① Flink sql 知其所以然(五)| 自定义 protobuf format

protobuf 作为目前各大公司中最广泛使用的高效的协议数据交换格式工具库,会大量作为流式数据传输的序列化方式,所以在 flink sql 中如果能实现 protobuf 的 format 会非常有用( 目前社区已经有对应的实现,不过目前还没有 merge,预计在 1.14 系列版本中能 release )。

issue 见: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC

pr 见: https://github.com/apache/flink/pull/14376

这一节主要介绍 flink sql 中怎么自定义实现 format ,其中以最常使用的 protobuf 作为案例来介绍。

如果想在本地直接测试下:

关于为什么选择 protobuf 可以看这篇文章,写的很详细:

http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral

在实时计算的领域中,为了可读性会选择 json ,为了效率以及一些已经依赖了 grpc 的公司会选择 protobuf 来做数据序列化,那么自然而然,日志的序列化方式也会选择 protobuf 。

而官方目前已经 release 的版本中是没有提供 flink sql api 的 protobuf format 的。如下图,基于 1.13 版本。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

因此本文在介绍怎样自定义一个 format 的同时,实现一个 protobuf format 来给大家使用。

预期效果是先实现几种最基本的数据类型,包括 protobuf 中的 message (自定义 model)、 map (映射)、 repeated (列表)、其他基本数据类型等,这些都是我们最常使用的类型。

预期 protobuf message 定义如下:

测试数据源数据如下,博主把 protobuf 的数据转换为 json,以方便展示,如下图:

预期 flink sql:

数据源表 DDL:

数据汇表 DDL:

Transform 执行逻辑:

下面是我在本地跑的结果:

可以看到打印的结果,数据是正确的被反序列化读入,并且最终输出到 console。

目前业界可以参考的实现如下: https://github.com/maosuhan/flink-pb , 也就是这位哥们负责目前 flink protobuf 的 format。

这种实现的具体使用方式如下:

其实现有几个特点:

[图片上传失败...(image-66c35b-1644940704671)]

其实上节已经详细描述了 flink sql 对于 sourcesinkformat 的加载机制。

如图 serde format 是通过 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 创建的

所有通过 SPI 的 sourcesinkformt 插件都继承自 Factory 。

整体创建 format 方法的调用链如下图。

最终实现如下,涉及到了几个实现类:

具体流程:

上述实现类的具体关系如下:

介绍完流程,进入具体实现方案细节:

ProtobufFormatFactory 主要创建 format 的逻辑:

resourcesMETA-INF 文件:

主要实现反序列化的逻辑:

可以注意到上述反序列化的主要逻辑就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定义的。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 其实就是一个 convertor 接口:

其作用就是将 protobuf message 中的每一个字段转换成为 RowData 中的每一个字段。

ProtobufToRowDataConverters 中就定义了具体转换逻辑,如截图所示,每一个 LogicalType 都定义了 protobuf message 字段转换为 flink 数据类型的逻辑:

源码后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取。

本文主要是针对 flink sql protobuf format 进行了原理解释以及对应的实现。
如果你正好需要这么一个 format,直接后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取源码吧。

当然上述只是 protobuf format 一个基础的实现,用于生产环境还有很多方面可以去扩展的。

② 大数据分析应该掌握哪些基础知识

Java基础语法

· 分支结构if/switch

· 循环结构for/while/do while

· 方法声明和调用

· 方法重载

· 数组的使用

· 命令行参数、可变参数

IDEA

· IDEA常用设置、常用快捷键

· 自定义模板

· 关联Tomcat

· Web项目案例实操

面向对象编程

· 封装、继承、多态、构造器、包

· 异常处理机制

· 抽象类、接口、内部类

· 常有基础API、集合List/Set/Map

· 泛型、线程的创建和启动

· 深入集合源码分析、常见数据结构解析

· 线程的安全、同步和通信、IO流体系

· 反射、类的加载机制、网络编程

Java8/9/10/11新特性

· Lambda表达式、方法引用

· 构造器引用、StreamAPI

· jShell(JShell)命令

· 接口的私有方法、Optional加强

· 局部变量的类型推断

· 更简化的编译运行程序等

MySQL

· DML语言、DDL语言、DCL语言

· 分组查询、Join查询、子查询、Union查询、函数

· 流程控制语句、事务的特点、事务的隔离级别等

JDBC

· 使用JDBC完成数据库增删改查操作

· 批处理的操作

· 数据库连接池的原理及应用

· 常见数据库连接池C3P0、DBCP、Druid等

Maven

· Maven环境搭建

· 本地仓库&中央仓库

· 创建Web工程

· 自动部署

· 持续继承

· 持续部署

Linux

· VI/VIM编辑器

· 系统管理操作&远程登录

· 常用命令

· 软件包管理&企业真题

Shell编程

· 自定义变量与特殊变量

· 运算符

· 条件判断

· 流程控制

· 系统函数&自定义函数

· 常用工具命令

· 面试真题

Hadoop

· Hadoop生态介绍

· Hadoop运行模式

· 源码编译

· HDFS文件系统底层详解

· DN&NN工作机制

· HDFS的API操作

· MapRece框架原理

· 数据压缩

· Yarn工作机制

· MapRece案例详解

· Hadoop参数调优

· HDFS存储多目录

· 多磁盘数据均衡

· LZO压缩

· Hadoop基准测试

Zookeeper

· Zookeeper数据结果

· 内部原理

· 选举机制

· Stat结构体

· 监听器

· 分布式安装部署

· API操作

· 实战案例

· 面试真题

· 启动停止脚本

HA+新特性

· HDFS-HA集群配置

Hive

· Hive架构原理

· 安装部署

· 远程连接

· 常见命令及基本数据类型

· DML数据操作

· 查询语句

· Join&排序

· 分桶&函数

· 压缩&存储

· 企业级调优

· 实战案例

· 面试真题

Flume

· Flume架构

· Agent内部原理

· 事务

· 安装部署

· 实战案例

· 自定义Source

· 自定义Sink

· Ganglia监控

Kafka

· 消息队列

· Kafka架构

· 集群部署

· 命令行操作

· 工作流程分析

· 分区分配策略

· 数据写入流程

· 存储策略

· 高阶API

· 低级API

· 拦截器

· 监控

· 高可靠性存储

· 数据可靠性和持久性保证

· ISR机制

· Kafka压测

· 机器数量计算

· 分区数计算

· 启动停止脚本

DataX

· 安装

· 原理

· 数据一致性

· 空值处理

· LZO压缩处理

Scala

· Scala基础入门

· 函数式编程

· 数据结构

· 面向对象编程

· 模式匹配

· 高阶函数

· 特质

· 注解&类型参数

· 隐式转换

· 高级类型

· 案例实操

Spark Core

· 安装部署

· RDD概述

· 编程模型

· 持久化&检查点机制

· DAG

· 算子详解

· RDD编程进阶

· 累加器&广播变量

Spark SQL

· SparkSQL

· DataFrame

· DataSet

· 自定义UDF&UDAF函数

Spark Streaming

· SparkStreaming

· 背压机制原理

· Receiver和Direct模式原理

· Window原理及案例实操

· 7x24 不间断运行&性能考量

Spark内核&优化

· 内核源码详解

· 优化详解

Hbase

· Hbase原理及架构

· 数据读写流程

· API使用

· 与Hive和Sqoop集成

· 企业级调优

Presto

· Presto的安装部署

· 使用Presto执行数仓项目的即席查询模块

Ranger2.0

· 权限管理工具Ranger的安装和使用

Azkaban3.0

· 任务调度工具Azkaban3.0的安装部署

· 使用Azkaban进行项目任务调度,实现电话邮件报警

Kylin3.0

· Kylin的安装部署

· Kylin核心思想

· 使用Kylin对接数据源构建模型

Atlas2.0

· 元数据管理工具Atlas的安装部署

Zabbix

· 集群监控工具Zabbix的安装部署

DolphinScheler

· 任务调度工具DolphinScheler的安装部署

· 实现数仓项目任务的自动化调度、配置邮件报警

Superset

· 使用SuperSet对数仓项目的计算结果进行可视化展示

Echarts

· 使用Echarts对数仓项目的计算结果进行可视化展示

Redis

· Redis安装部署

· 五大数据类型

· 总体配置

· 持久化

· 事务

· 发布订阅

· 主从复制

Canal

· 使用Canal实时监控MySQL数据变化采集至实时项目

Flink

· 运行时架构

· 数据源Source

· Window API

· Water Mark

· 状态编程

· CEP复杂事件处理

Flink SQL

· Flink SQL和Table API详细解读

Flink 内核

· Flink内核源码讲解

· 经典面试题讲解

Git&GitHub

· 安装配置

· 本地库搭建

· 基本操作

· 工作流

· 集中式

ClickHouse

· ClickHouse的安装部署

· 读写机制

· 数据类型

· 执行引擎

DataV

· 使用DataV对实时项目需求计算结果进行可视化展示

sugar

· 结合Springboot对接网络sugar实现数据可视化大屏展示

Maxwell

· 使用Maxwell实时监控MySQL数据变化采集至实时项目

ElasticSearch

· ElasticSearch索引基本操作、案例实操

Kibana

· 通过Kibana配置可视化分析

Springboot

· 利用Springboot开发可视化接口程序

③ flinksql-core-动态表

普通动态表是FlinkSQL中的一类表,表中的数据与连接的外部数据对等,可以简单理解为把一张mysql的表放进flink内存中得到的表,并且该表与mysql表有连接关系,即该表可以读写mysql表。

需要声明表的字段定义和表属性(连接器属性)。语法如下:

with关键字前面的是字段定义,with关键字后面的是表属性。其中字段定义时还可以声明表主键,声明语法为PARIMARY KEY(myColumn1,...) NOT ENFORCED, 这里的not enforced表示flinksql不会对主键做强制的唯一性约束、非空约束,而且目前flinksql中只支持这种类型的主键。
表属性中有若干个属性字段需要声明,具体有哪些属性字段取决于使用哪个连接器,如上述声明中使用的是jdbc连接器,在使用该连接器时需要提供url、username、password等属性,通过此连接器我们就可以让该表能连接到对应的mysql表。

我们可以查询flinksql普通动态表的数据,此数据与连接的外部数据是一致的。语法如下:

tips:在运行时,只会加载一次外部数据到flinksql普通动态表。后续外部数据表有更新时,flinksql的普通动态表不会跟着自动更新。

我们可以把数据写入到flinksql动态表,从而实现写入数据到外部系统的目的。语法如下:

④ Hive sql及窗口函数

hive函数:

1、根据指定条件返回结果:case when then else end as

2、基本类型转换:CAST()

3、nvl:处理空字段:三个str时,是否为空可以指定返回不同的值

4、sql通配符: https://www.w3school.com.cn/sql/sql_wildcards.asp

5、count(1)与COUNT(*):返回行数

如果表没有主键,那么count(1)比count(*)快;

如果有主键,那么count(主键,联合主键)比count(*)快;

count(1)跟count(主键)一样,只扫描主键。count(*)跟count(非主键)一样,扫描整个表。明显前者更快一些。

性能问题:

1.任何情况下SELECT COUNT(*) FROM tablename是最优选择,(指没有where的情况);

2.尽量减少SELECT COUNT(*) FROM tablename WHERE COL = ‘value’ 这种查询;

3.杜绝SELECT COUNT(COL) FROM tablename WHERE COL2 = ‘value’ 的出现。

count(expression):查询 is_reply=0 的数量: SELECT COUNT(IF(is_reply=0,1,NULL)) count FROM t_iov_help_feedback;

6、distinct与group by

distinct去重所有distinct之后所有的字段,如果有一个字段值不一致就不作为一条

group by是根据某一字段分组,然后查询出该条数据的所需字段,可以搭配 where max(time)或者Row_Number函数使用,求出最大的一条数据

7、使用with 临时表名 as() 的形式,简单的临时表直接嵌套进sql中,复杂的和需要复用的表写到临时表中,关联的时候先找到关联字段,过滤条件最好在临时表中先过滤后关联

处理json的函数:

split(json_array_string(schools), '\\|\\|') AS schools

get_json_object(school, '$.id') AS school_id,

字符串函数:

1、instr(’源字符串’ , ‘目标字符串’ ,’开始位置’,’第几次出现’)

instr(sourceString,destString,start,appearPosition)

1.sourceString代表源字符串; destString代表要从源字符串中查找的子串;

2.start代表查找的开始位置,这个参数可选的,默认为1;

3.appearPosition代表想从源字符中查找出第几次出现的destString,这个参数也是可选的, 默认为1

4.如果start的值为负数,则代表从右往左进行查找,但是位置数据仍然从左向右计算。

5.返回值为:查找到的字符串的位置。如果没有查找到,返回0。

最简单例子: 在abcd中查找a的位置,从第一个字母开始查,查找第一次出现时的位置

select instr(‘abcd’,’a’,1,1) from al; —1

应用于模糊查询:instr(字段名/列名, ‘查找字段’)

select code,name,dept,occupation from staff where instr(code, ‘001’)> 0;

等同于 select code, name, dept, occupation from staff where code like ‘%001%’ ;

应用于判断包含关系:

select ccn,mas_loc from mas_loc where instr(‘FH,FHH,FHM’,ccn)>0;

等同于 select ccn,mas_loc from mas_loc where ccn in (‘FH’,’FHH’,’FHM’);

2、substr(string A,int start,int len)和 substring(string A,int start,int len),用法一样

substr(time,1,8) 表示将time从第1位开始截取,截取的长度为8位

第一种用法:

substr(string A,int start)和 substring(string A,int start),用法一样

功效:返回字符串A从下标start位置到结尾的字符串

第二种用法:

substr(string A,int start,int len)和 substring(string A,int start,int len),用法一样

功效:返回字符串A从下标start位置开始,长度为len的字符串

3、get_json_object(form_data,'$.学生姓名') as student_name

json_tuple 函数的作用:用来解析json字符串中的多个字段

4、split(full_name, '\\.') [5] AS zq;  取的是数组里的第六个

日期(时间)函数:

1、to_date(event_time) 返回日期部分

2、date_sub:返回当前日期的相对时间

当前日期:select curdate() 

当前日期前一天:select  date_sub(curdate(),interval 1 day)

当前日期后一天:select date_sub(curdate(),interval -1 day)

date_sub(from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss'), 14)  将现在的时间总秒数转为标准格式时间,返回14天之前的时间

时间戳>>>>日期:

from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') 将现在的时间总秒数转为标准格式时间

from_unixtime(get_json_object(get_json_object(form_data,'$.挽单时间'),'$.$date')/1000) as retain_time

unix_timestamp('2019-08-15 16:40:00','yyyy-MM-dd HH:mm:ss')  --1565858400

日期>>>>时间戳:unix_timestamp()

date_format:yyyy-MM-dd HH:mm:ss 时间转格式化时间

select date_format('2019-10-07 13:24:20', 'yyyyMMdd000000')-- 20191007000000select date_format('2019-10-07', 'yyyyMMdd000000')-- 20191007000000

1.日期比较函数: datediff语法: datediff(string enddate,string startdate) 

返回值: int 

说明: 返回结束日期减去开始日期的天数。 

举例:  hive> select datediff('2016-12-30','2016-12-29');  1

2.日期增加函数: date_add语法: date_add(string startdate, intdays) 

返回值: string 

说明: 返回开始日期startdate增加days天后的日期。 

举例:  hive>select date_add('2016-12-29',10);  2017-01-08

3.日期减少函数: date_sub语法: date_sub (string startdate,int days) 

返回值: string 

说明: 返回开始日期startdate减少days天后的日期。 

举例:  hive>select date_sub('2016-12-29',10);  2016-12-19

4.查询近30天的数据

select * from table where datediff(current_timestamp,create_time)<=30;

create_time 为table里的字段,current_timestamp 返回当前时间 2018-06-01 11:00:00

3、trunc()函数的用法:当前日期的各种第一天,或者对数字进行不四舍五入的截取

日期:

1.select trunc(sysdate) from al  --2011-3-18  今天的日期为2011-3-18

2.select trunc(sysdate, 'mm')   from   al  --2011-3-1    返回当月第一天.

上月1号    trunc(add_months(current_date(),-1),'MM')

3.select trunc(sysdate,'yy') from al  --2011-1-1       返回当年第一天

4.select trunc(sysdate,'dd') from al  --2011-3-18    返回当前年月日

5.select trunc(sysdate,'yyyy') from al  --2011-1-1   返回当年第一天

6.select trunc(sysdate,'d') from al  --2011-3-13 (星期天)返回当前星期的第一天

7.select trunc(sysdate, 'hh') from al   --2011-3-18 14:00:00   当前时间为14:41  

8.select trunc(sysdate, 'mi') from al  --2011-3-18 14:41:00   TRUNC()函数没有秒的精确

数字:TRUNC(number,num_digits) Number 需要截尾取整的数字。Num_digits 的默认值为 0。TRUNC()函数截取时不进行四舍五入

11.select trunc(123.458,1) from al --123.4

12.select trunc(123.458,-1) from al --120

4、round():四舍五入:

select round(1.455, 2)  #结果是:1.46,即四舍五入到十分位,也就是保留两位小数

select round(1.5)  #默认四舍五入到个位,结果是:2

select round(255, -1)  #结果是:260,即四舍五入到十位,此时个位是5会进位

floor():地板数

ceil()天花板数

5、

6.日期转年函数: year语法:   year(string date) 

返回值: int

说明: 返回日期中的年。

举例:

hive>   select year('2011-12-08 10:03:01') from al;

2011

hive>   select year('2012-12-08') fromal;

2012

7.日期转月函数: month语法: month   (string date) 

返回值: int

说明: 返回日期中的月份。

举例:

hive>   select month('2011-12-08 10:03:01') from al;

12

hive>   select month('2011-08-08') fromal;

8

8.日期转天函数: day语法: day   (string date) 

返回值: int

说明: 返回日期中的天。

举例:

hive>   select day('2011-12-08 10:03:01') from al;

8

hive>   select day('2011-12-24') fromal;

24

9.日期转小时函数: hour语法: hour   (string date) 

返回值: int

说明: 返回日期中的小时。

举例:

hive>   select hour('2011-12-08 10:03:01') from al;

10

10.日期转分钟函数: minute语法: minute   (string date) 

返回值: int

说明: 返回日期中的分钟。

举例:

hive>   select minute('2011-12-08 10:03:01') from al;

3

11.日期转秒函数: second语法: second   (string date) 

返回值: int

说明: 返回日期中的秒。

举例:

hive>   select second('2011-12-08 10:03:01') from al;

1

12.日期转周函数: weekofyear语法:   weekofyear (string date) 

返回值: int

说明: 返回日期在当前的周数。

举例:

hive>   select weekofyear('2011-12-08 10:03:01') from al;

49

查看hive表在hdfs中的位置:show create table 表名;

在hive中hive2hive,hive2hdfs:

HDFS、本地、hive -----> Hive:使用 insert into | overwrite、loaddata local inpath "" into table student;

Hive ----> Hdfs、本地:使用:insert overwrite | local

网站访问量统计:

uv:每用户访问次数

ip:每ip(可能很多人)访问次数

PV:是指页面的浏览次数

VV:是指你访问网站的次数

sql:

基本函数:

count、max、min、sum、avg、like、rlike('2%'、'_2%'、%2%'、'[2]')(java正则)

and、or、not、in   

where、group by、having、{ join on 、full join}  、order by(desc降序)

sort by需要与distribut by集合结合使用:

hive (default)> set maprece.job.reces=3;  //先设置rece的数量 

insert overwrite local directory '/opt/mole/datas/distribute-by'

row format delimited fields terminated by '\t'

先按照部门编号分区,再按照员工编号降序排序。

select * from emp distribute by deptno sort by empno desc;

外部表  create external table if not exists dept

分区表:create table dept_partition ( deptno int, dname string, loc string )  partitioned by ( month string )

load data local inpath '/opt/mole/datas/dept.txt' into table default.dept_partition partition(month='201809'); 

 alter table dept_partition add/drop partition(month='201805') ,partition(month='201804');

多分区联合查询:union

select * from dept_partition2 where month='201809' and day='10';

show partitions dept_partition;

desc formatted dept_partition;

二级分区表:create table dept_partition2 ( deptno int, dname string, loc string ) partitioned by (month string, day string) row format delimited fields terminated by '\t';

分桶抽样查询:分区针对的是数据的存储路径;分桶针对的是数据文件

create table stu_buck(id int, name string) clustered by(id) into 4 bucketsrow format delimited fields terminated by '\t';

设置开启分桶与rece为1:

set hive.enforce.bucketing=true;

set maprece.job.reces=-1;

分桶抽样:select * from stu_bucktablesample(bucket x out of y on id);

抽取,桶数/y,x是从哪个桶开始抽取,y越大 抽样数越少,y与抽样数成反比,x必须小于y

给空字段赋值:

如果员工的comm为NULL,则用-1代替或用其他字段代替  :select nvl(comm,-1) from emp;

case when:如何符合记为1,用于统计、分组统计

select dept_id, sum(case sex when '男' then 1 else 0 end) man , sum(case sex when '女' then 1 else 0 end) woman from emp_sex group by dept_id;

用于组合归类汇总(行转列):UDAF:多转一

concat:拼接查询结果

collect_set(col):去重汇总,产生array类型字段,类似于distinct

select t.base, concat_ws('|',collect_set(t.name))   from (select concat_ws(',',xingzuo,blood_type) base,name  from person_info) t group by t.base;

解释:先第一次查询得到一张没有按照(星座血型)分组的表,然后分组,使用collect_set将名字组合成数组,然后使用concat将数组变成字符串

用于拆分数据:(列转行):UDTF:一转多

explode(col):将hive一列中复杂的array或者map结构拆分成多行。

lateral view  侧面显示:用于和UDTF一对多函数搭配使用

用法:lateral view udtf(expression) tablealias as cate

cate:炸开之后的列别名

temptable :临时表表名

解释:用于和split, explode等UDTF一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。

开窗函数:

Row_Number,Rank,Dense_Rank  over:针对统计查询使用

Row_Number:返回从1开始的序列

Rank:生成分组中的排名序号,会在名词s中留下空位。3 3 5

dense_rank:生成分组中的排名序号,不会在名词中留下空位。3 3 4

over:主要是分组排序,搭配窗口函数使用

结果:

SUM、AVG、MIN、MAX、count

preceding:往前

following:往后

current row:当前行

unbounded:unbounded preceding 从前面的起点, unbounded following:到后面的终点

sum:直接使用sum是总的求和,结合over使用可统计至每一行的结果、总的结果、当前行+之前多少行/之后多少行、当前行到往后所有行的求和。

over(rowsbetween 3/current )  当前行到往后所有行的求和

ntile:分片,结合over使用,可以给数据分片,返回分片号

使用场景:统计出排名前百分之或n分之一的数据。

lead,lag,FIRST_VALUE,LAST_VALUE

lag与lead函数可以返回上下行的数据

lead(col,n,dafault) 用于统计窗口内往下第n行值

第一个参数为列名,第二个参数为往下第n行(可选,默认为1),第三个参数为默认值(当往下第n行为NULL时候,取默认值,如不指定,则为NULL)

LAG(col,n,DEFAULT) 用于统计窗口内往上第n行值

第一个参数为列名,第二个参数为往上第n行(可选,默认为1),第三个参数为默认值(当往上第n行为NULL时候,取默认值,如不指定,则为NULL)

使用场景:通常用于统计某用户在某个网页上的停留时间

FIRST_VALUE:取分组内排序后,截止到当前行,第一个值

LAST_VALUE:取分组内排序后,截止到当前行,最后一个值

范围内求和: https://blog.csdn.net/happyrocking/article/details/105369558

cume_dist,percent_rank

–CUME_DIST :小于等于当前值的 行数 / 分组内总行数

–比如,统计小于等于当前薪水的人数,占总人数的比例

percent_rank:分组内当前行的RANK值-1/分组内总行数-1

总结:

在Spark中使用spark sql与hql一致,也可以直接使用sparkAPI实现。

HiveSql窗口函数主要应用于求TopN,分组排序TopN、TopN求和,前多少名前百分之几。

与Flink窗口函数不同。

Flink中的窗口是用于将无线数据流切分为有限块处理的手段。

window分类:

CountWindow:按照指定的数据条数生成一个 Window,与时间无关。

TimeWindow:按照时间生成 Window。

1. 滚动窗口(Tumbling Windows):时间对齐,窗口长度固定,不重叠::常用于时间段内的聚合计算

2.滑动窗口(Sliding Windows):时间对齐,窗口长度固定,可以有重叠::适用于一段时间内的统计(某接口最近 5min 的失败率来报警)

3. 会话窗口(Session Windows)无时间对齐,无长度,不重叠::设置session间隔,超过时间间隔则窗口关闭。

⑤ Flink:特性、概念、组件栈、架构及原理分析

简单之美 | Apache Flink:特性、概念、组件栈、架构及原理分析
http://shiyanjun.cn/archives/1508.html

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们它们所提供的SLA是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有MapRece、Tez、Crunch、Spark,实现流处理的开源方案有Samza、Storm。Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基于同一个Flink运行时(Flink Runtime),分别提供了流处理和批处理API,而这两种API也是实现上层面向流处理、批处理类型应用框架的基础。
基本特性
关于Flink所支持的特性,我这里只是通过分类的方式简单做一下梳理,涉及到具体的一些概念及其原理会在后面的部分做详细说明。
流处理特性
支持高吞吐、低延迟、高性能的流处理
支持带有事件时间的窗口(Window)操作
支持有状态计算的Exactly-once语义
支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
支持具有Backpressure功能的持续流模型
支持基于轻量级分布式快照(Snapshot)实现的容错
一个运行时同时支持Batch on Streaming处理和Streaming处理
Flink在JVM内部实现了自己的内存管理
支持迭代计算
支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

API支持
对Streaming数据类应用,提供DataStream API
对批处理类应用,提供DataSet API(支持Java/Scala)

Libraries支持
支持机器学习(FlinkML)
支持图分析(Gelly)
支持关系数据处理(Table)
支持复杂事件处理(CEP)

整合支持
支持Flink on YARN
支持HDFS
支持来自Kafka的输入数据
支持Apache HBase
支持Hadoop程序
支持Tachyon
支持ElasticSearch
支持RabbitMQ
支持Apache Storm
支持S3
支持XtreemFS

基本概念
Stream & Transformation & Operator
用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。下面是一个由Flink程序映射为Streaming Dataflow的示意图,如下所示:

比如从Source[1]到map()[1],它保持了Source的分区特性(Partitioning)和分区内元素处理的有序性,也就是说map()[1]的Subtask看到数据流中记录的顺序,与Source[1]中看到的记录顺序是一致的。
Redistribution模式

这种模式改变了输入数据流的分区,比如从map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多个不同的Subtask发送数据,改变了数据流的分区,这与实际应用所选择的Operator有关系。另外,Source Operator对应2个Subtask,所以并行度为2,而Sink Operator的Subtask只有1个,故而并行度为1。
Task & Operator Chain
在Flink分布式执行环境中,会将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行,如下图所示:

在Flink集群启动的时候,TaskManager会向JobManager注册,如果注册成功,则JobManager会向TaskManager回复消息AcknowledgeRegistration。
SubmitJob

Flink程序内部通过Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。
CancelJob

请求取消一个Flink Job的执行,CancelJob消息中包含了Job的ID,如果成功则返回消息CancellationSuccess,失败则返回消息CancellationFailure。
UpdateTaskExecutionState

TaskManager会向JobManager请求更新ExecutionGraph中的ExecutionVertex的状态信息,更新成功则返回true。
RequestNextInputSplit

运行在TaskManager上面的Task,请求获取下一个要处理的输入Split,成功则返回NextInputSplit。
JobStatusChanged

ExecutionGraph向JobManager发送该消息,用来表示Flink Job的状态发生的变化,例如:RUNNING、CANCELING、FINISHED等。
TaskManager
TaskManager也是一个Actor,它是实际负责执行计算的Worker,在其上执行Flink Job的一组Task。每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。TaskManager端可以分成两个阶段:
注册阶段

TaskManager会向JobManager注册,发送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然后TaskManager就可以进行初始化过程。
可操作阶段

该阶段TaskManager可以接收并处理与Task有关的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager无法连接到JobManager,这是TaskManager就失去了与JobManager的联系,会自动进入“注册阶段”,只有完成注册才能继续处理Task相关的消息。
Client
当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。Client会将用户提交的Flink程序组装一个JobGraph, 并且是以JobGraph的形式提交的。一个JobGraph是一个Flink Dataflow,它由多个JobVertex组成的DAG。其中,一个JobGraph包含了一个Flink程序的如下信息:JobID、Job名称、配置信息、一组JobVertex等。
组件栈
Flink是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。Flink分层的组件栈如下图所示:

了解YARN的话,对上图的原理非常熟悉,实际Flink也实现了满足在YARN集群上运行的各个组件:Flink YARN Client负责与YARN RM通信协商资源请求,Flink JobManager和Flink TaskManager分别申请到Container去运行各自的进程。通过上图可以看到,YARN AM与Flink JobManager在同一个Container中,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,并进行后续的映射、调度和计算处理。
Runtime层

Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。
API层

API层主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API。
Libraries层

该层也可以称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。
内部原理
容错机制
Flink基于Checkpoint机制实现容错,它的原理是不断地生成分布式Streaming数据流Snapshot。在流处理失败时,通过这些Snapshot可以恢复数据流处理。理解Flink的容错机制,首先需要了解一下Barrier这个概念:Stream Barrier是Flink分布式Snapshotting中的核心元素,它会作为数据流的记录被同等看待,被插入到数据流中,将数据流中记录的进行分组,并沿着数据流的方向向前推进。每个Barrier会携带一个Snapshot ID,属于该Snapshot的记录会被推向该Barrier的前方。因为Barrier非常轻量,所以并不会中断数据流。带有Barrier的数据流,如下图所示:

接收到Barrier n的Stream被临时搁置,来自这些Stream的记录不会被处理,而是被放在一个Buffer中
一旦最后一个Stream接收到Barrier n,Operator会emit所有暂存在Buffer中的记录,然后向Checkpoint Coordinator发送Snapshot n
继续处理来自多个Stream的记录

基于Stream Aligning操作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐Barrier的一个Stream为处理Buffer中缓存记录的时刻点。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。
调度机制
在JobManager端,会接收到Client提交的JobGraph形式的Flink Job,JobManager会将一个JobGraph转换映射为一个ExecutionGraph,如下图所示:

迭代机制
机器学习和图计算应用,都会使用到迭代计算,Flink通过在迭代Operator中定义Step函数来实现迭代算法,这种迭代算法包括Iterate和Delta Iterate两种类型,在实现上它们反复地在当前迭代状态上调用Step函数,直到满足给定的条件才会停止迭代。下面,对Iterate和Delta Iterate两种类型的迭代算法原理进行说明:
Iterate

Iterate Operator是一种简单的迭代形式:每一轮迭代,Step函数的输入或者是输入的整个数据集,或者是上一轮迭代的结果,通过该轮迭代计算出下一轮计算所需要的输入(也称为Next Partial Solution),满足迭代的终止条件后,会输出最终迭代结果,具体执行流程如下图所示:

Delta Iterate Operator实现了增量迭代,它的实现原理如下图所示:

另外,Flink还提供了3个参数来配置Backpressure监控行为:
参数名称
默认值
说明

jobmanager.web.backpressure.refresh-interval
60000
默认1分钟,表示采样统计结果刷新时间间隔

jobmanager.web.backpressure.num-samples
100
评估Backpressure状态,所使用的堆栈跟踪调用次数

jobmanager.web.backpressure.delay-between-samples
50
默认50毫秒,表示对一个Job的每个Task依次调用的时间间隔

通过上面个定义的Backpressure状态,以及调整相应的参数,可以确定当前运行的Job的状态是否正常,并且保证不影响JobManager提供服务。
参考链接
http://flink.apache.org/
http://flink.apache.org/features.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/general_arch.html
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheling.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/libs/cep.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/gelly.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/ml/index.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/table.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
http://geek.csdn.net/news/detail/56272
http://samza.apache.org/