當前位置:首頁 » 編程語言 » flinksql數據類型轉換
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

flinksql數據類型轉換

發布時間: 2023-04-13 20:25:18

① Flink sql 知其所以然(五)| 自定義 protobuf format

protobuf 作為目前各大公司中最廣泛使用的高效的協議數據交換格式工具庫,會大量作為流式數據傳輸的序列化方式,所以在 flink sql 中如果能實現 protobuf 的 format 會非常有用( 目前社區已經有對應的實現,不過目前還沒有 merge,預計在 1.14 系列版本中能 release )。

issue 見: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC

pr 見: https://github.com/apache/flink/pull/14376

這一節主要介紹 flink sql 中怎麼自定義實現 format ,其中以最常使用的 protobuf 作為案例來介紹。

如果想在本地直接測試下:

關於為什麼選擇 protobuf 可以看這篇文章,寫的很詳細:

http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral

在實時計算的領域中,為了可讀性會選擇 json ,為了效率以及一些已經依賴了 grpc 的公司會選擇 protobuf 來做數據序列化,那麼自然而然,日誌的序列化方式也會選擇 protobuf 。

而官方目前已經 release 的版本中是沒有提供 flink sql api 的 protobuf format 的。如下圖,基於 1.13 版本。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

因此本文在介紹怎樣自定義一個 format 的同時,實現一個 protobuf format 來給大家使用。

預期效果是先實現幾種最基本的數據類型,包括 protobuf 中的 message (自定義 model)、 map (映射)、 repeated (列表)、其他基本數據類型等,這些都是我們最常使用的類型。

預期 protobuf message 定義如下:

測試數據源數據如下,博主把 protobuf 的數據轉換為 json,以方便展示,如下圖:

預期 flink sql:

數據源表 DDL:

數據匯表 DDL:

Transform 執行邏輯:

下面是我在本地跑的結果:

可以看到列印的結果,數據是正確的被反序列化讀入,並且最終輸出到 console。

目前業界可以參考的實現如下: https://github.com/maosuhan/flink-pb , 也就是這位哥們負責目前 flink protobuf 的 format。

這種實現的具體使用方式如下:

其實現有幾個特點:

[圖片上傳失敗...(image-66c35b-1644940704671)]

其實上節已經詳細描述了 flink sql 對於 sourcesinkformat 的載入機制。

如圖 serde format 是通過 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 創建的

所有通過 SPI 的 sourcesinkformt 插件都繼承自 Factory 。

整體創建 format 方法的調用鏈如下圖。

最終實現如下,涉及到了幾個實現類:

具體流程:

上述實現類的具體關系如下:

介紹完流程,進入具體實現方案細節:

ProtobufFormatFactory 主要創建 format 的邏輯:

resourcesMETA-INF 文件:

主要實現反序列化的邏輯:

可以注意到上述反序列化的主要邏輯就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定義的。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 其實就是一個 convertor 介面:

其作用就是將 protobuf message 中的每一個欄位轉換成為 RowData 中的每一個欄位。

ProtobufToRowDataConverters 中就定義了具體轉換邏輯,如截圖所示,每一個 LogicalType 都定義了 protobuf message 欄位轉換為 flink 數據類型的邏輯:

源碼後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取。

本文主要是針對 flink sql protobuf format 進行了原理解釋以及對應的實現。
如果你正好需要這么一個 format,直接後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取源碼吧。

當然上述只是 protobuf format 一個基礎的實現,用於生產環境還有很多方面可以去擴展的。

② 大數據分析應該掌握哪些基礎知識

Java基礎語法

· 分支結構if/switch

· 循環結構for/while/do while

· 方法聲明和調用

· 方法重載

· 數組的使用

· 命令行參數、可變參數

IDEA

· IDEA常用設置、常用快捷鍵

· 自定義模板

· 關聯Tomcat

· Web項目案例實操

面向對象編程

· 封裝、繼承、多態、構造器、包

· 異常處理機制

· 抽象類、介面、內部類

· 常有基礎API、集合List/Set/Map

· 泛型、線程的創建和啟動

· 深入集合源碼分析、常見數據結構解析

· 線程的安全、同步和通信、IO流體系

· 反射、類的載入機制、網路編程

Java8/9/10/11新特性

· Lambda表達式、方法引用

· 構造器引用、StreamAPI

· jShell(JShell)命令

· 介面的私有方法、Optional加強

· 局部變數的類型推斷

· 更簡化的編譯運行程序等

MySQL

· DML語言、DDL語言、DCL語言

· 分組查詢、Join查詢、子查詢、Union查詢、函數

· 流程式控制制語句、事務的特點、事務的隔離級別等

JDBC

· 使用JDBC完成資料庫增刪改查操作

· 批處理的操作

· 資料庫連接池的原理及應用

· 常見資料庫連接池C3P0、DBCP、Druid等

Maven

· Maven環境搭建

· 本地倉庫&中央倉庫

· 創建Web工程

· 自動部署

· 持續繼承

· 持續部署

Linux

· VI/VIM編輯器

· 系統管理操作&遠程登錄

· 常用命令

· 軟體包管理&企業真題

Shell編程

· 自定義變數與特殊變數

· 運算符

· 條件判斷

· 流程式控制制

· 系統函數&自定義函數

· 常用工具命令

· 面試真題

Hadoop

· Hadoop生態介紹

· Hadoop運行模式

· 源碼編譯

· HDFS文件系統底層詳解

· DN&NN工作機制

· HDFS的API操作

· MapRece框架原理

· 數據壓縮

· Yarn工作機制

· MapRece案例詳解

· Hadoop參數調優

· HDFS存儲多目錄

· 多磁碟數據均衡

· LZO壓縮

· Hadoop基準測試

Zookeeper

· Zookeeper數據結果

· 內部原理

· 選舉機制

· Stat結構體

· 監聽器

· 分布式安裝部署

· API操作

· 實戰案例

· 面試真題

· 啟動停止腳本

HA+新特性

· HDFS-HA集群配置

Hive

· Hive架構原理

· 安裝部署

· 遠程連接

· 常見命令及基本數據類型

· DML數據操作

· 查詢語句

· Join&排序

· 分桶&函數

· 壓縮&存儲

· 企業級調優

· 實戰案例

· 面試真題

Flume

· Flume架構

· Agent內部原理

· 事務

· 安裝部署

· 實戰案例

· 自定義Source

· 自定義Sink

· Ganglia監控

Kafka

· 消息隊列

· Kafka架構

· 集群部署

· 命令行操作

· 工作流程分析

· 分區分配策略

· 數據寫入流程

· 存儲策略

· 高階API

· 低級API

· 攔截器

· 監控

· 高可靠性存儲

· 數據可靠性和持久性保證

· ISR機制

· Kafka壓測

· 機器數量計算

· 分區數計算

· 啟動停止腳本

DataX

· 安裝

· 原理

· 數據一致性

· 空值處理

· LZO壓縮處理

Scala

· Scala基礎入門

· 函數式編程

· 數據結構

· 面向對象編程

· 模式匹配

· 高階函數

· 特質

· 註解&類型參數

· 隱式轉換

· 高級類型

· 案例實操

Spark Core

· 安裝部署

· RDD概述

· 編程模型

· 持久化&檢查點機制

· DAG

· 運算元詳解

· RDD編程進階

· 累加器&廣播變數

Spark SQL

· SparkSQL

· DataFrame

· DataSet

· 自定義UDF&UDAF函數

Spark Streaming

· SparkStreaming

· 背壓機制原理

· Receiver和Direct模式原理

· Window原理及案例實操

· 7x24 不間斷運行&性能考量

Spark內核&優化

· 內核源碼詳解

· 優化詳解

Hbase

· Hbase原理及架構

· 數據讀寫流程

· API使用

· 與Hive和Sqoop集成

· 企業級調優

Presto

· Presto的安裝部署

· 使用Presto執行數倉項目的即席查詢模塊

Ranger2.0

· 許可權管理工具Ranger的安裝和使用

Azkaban3.0

· 任務調度工具Azkaban3.0的安裝部署

· 使用Azkaban進行項目任務調度,實現電話郵件報警

Kylin3.0

· Kylin的安裝部署

· Kylin核心思想

· 使用Kylin對接數據源構建模型

Atlas2.0

· 元數據管理工具Atlas的安裝部署

Zabbix

· 集群監控工具Zabbix的安裝部署

DolphinScheler

· 任務調度工具DolphinScheler的安裝部署

· 實現數倉項目任務的自動化調度、配置郵件報警

Superset

· 使用SuperSet對數倉項目的計算結果進行可視化展示

Echarts

· 使用Echarts對數倉項目的計算結果進行可視化展示

Redis

· Redis安裝部署

· 五大數據類型

· 總體配置

· 持久化

· 事務

· 發布訂閱

· 主從復制

Canal

· 使用Canal實時監控MySQL數據變化採集至實時項目

Flink

· 運行時架構

· 數據源Source

· Window API

· Water Mark

· 狀態編程

· CEP復雜事件處理

Flink SQL

· Flink SQL和Table API詳細解讀

Flink 內核

· Flink內核源碼講解

· 經典面試題講解

Git&GitHub

· 安裝配置

· 本地庫搭建

· 基本操作

· 工作流

· 集中式

ClickHouse

· ClickHouse的安裝部署

· 讀寫機制

· 數據類型

· 執行引擎

DataV

· 使用DataV對實時項目需求計算結果進行可視化展示

sugar

· 結合Springboot對接網路sugar實現數據可視化大屏展示

Maxwell

· 使用Maxwell實時監控MySQL數據變化採集至實時項目

ElasticSearch

· ElasticSearch索引基本操作、案例實操

Kibana

· 通過Kibana配置可視化分析

Springboot

· 利用Springboot開發可視化介面程序

③ flinksql-core-動態表

普通動態表是FlinkSQL中的一類表,表中的數據與連接的外部數據對等,可以簡單理解為把一張mysql的表放進flink內存中得到的表,並且該表與mysql表有連接關系,即該表可以讀寫mysql表。

需要聲明表的欄位定義和表屬性(連接器屬性)。語法如下:

with關鍵字前面的是欄位定義,with關鍵字後面的是表屬性。其中欄位定義時還可以聲明表主鍵,聲明語法為PARIMARY KEY(myColumn1,...) NOT ENFORCED, 這里的not enforced表示flinksql不會對主鍵做強制的唯一性約束、非空約束,而且目前flinksql中只支持這種類型的主鍵。
表屬性中有若干個屬性欄位需要聲明,具體有哪些屬性欄位取決於使用哪個連接器,如上述聲明中使用的是jdbc連接器,在使用該連接器時需要提供url、username、password等屬性,通過此連接器我們就可以讓該表能連接到對應的mysql表。

我們可以查詢flinksql普通動態表的數據,此數據與連接的外部數據是一致的。語法如下:

tips:在運行時,只會載入一次外部數據到flinksql普通動態表。後續外部數據表有更新時,flinksql的普通動態表不會跟著自動更新。

我們可以把數據寫入到flinksql動態表,從而實現寫入數據到外部系統的目的。語法如下:

④ Hive sql及窗口函數

hive函數:

1、根據指定條件返回結果:case when then else end as

2、基本類型轉換:CAST()

3、nvl:處理空欄位:三個str時,是否為空可以指定返回不同的值

4、sql通配符: https://www.w3school.com.cn/sql/sql_wildcards.asp

5、count(1)與COUNT(*):返回行數

如果表沒有主鍵,那麼count(1)比count(*)快;

如果有主鍵,那麼count(主鍵,聯合主鍵)比count(*)快;

count(1)跟count(主鍵)一樣,只掃描主鍵。count(*)跟count(非主鍵)一樣,掃描整個表。明顯前者更快一些。

性能問題:

1.任何情況下SELECT COUNT(*) FROM tablename是最優選擇,(指沒有where的情況);

2.盡量減少SELECT COUNT(*) FROM tablename WHERE COL = 『value』 這種查詢;

3.杜絕SELECT COUNT(COL) FROM tablename WHERE COL2 = 『value』 的出現。

count(expression):查詢 is_reply=0 的數量: SELECT COUNT(IF(is_reply=0,1,NULL)) count FROM t_iov_help_feedback;

6、distinct與group by

distinct去重所有distinct之後所有的欄位,如果有一個欄位值不一致就不作為一條

group by是根據某一欄位分組,然後查詢出該條數據的所需欄位,可以搭配 where max(time)或者Row_Number函數使用,求出最大的一條數據

7、使用with 臨時表名 as() 的形式,簡單的臨時表直接嵌套進sql中,復雜的和需要復用的表寫到臨時表中,關聯的時候先找到關聯欄位,過濾條件最好在臨時表中先過濾後關聯

處理json的函數:

split(json_array_string(schools), '\\|\\|') AS schools

get_json_object(school, '$.id') AS school_id,

字元串函數:

1、instr(』源字元串』 , 『目標字元串』 ,』開始位置』,』第幾次出現』)

instr(sourceString,destString,start,appearPosition)

1.sourceString代表源字元串; destString代表要從源字元串中查找的子串;

2.start代表查找的開始位置,這個參數可選的,默認為1;

3.appearPosition代表想從源字元中查找出第幾次出現的destString,這個參數也是可選的, 默認為1

4.如果start的值為負數,則代表從右往左進行查找,但是位置數據仍然從左向右計算。

5.返回值為:查找到的字元串的位置。如果沒有查找到,返回0。

最簡單例子: 在abcd中查找a的位置,從第一個字母開始查,查找第一次出現時的位置

select instr(『abcd』,』a』,1,1) from al; —1

應用於模糊查詢:instr(欄位名/列名, 『查找欄位』)

select code,name,dept,occupation from staff where instr(code, 『001』)> 0;

等同於 select code, name, dept, occupation from staff where code like 『%001%』 ;

應用於判斷包含關系:

select ccn,mas_loc from mas_loc where instr(『FH,FHH,FHM』,ccn)>0;

等同於 select ccn,mas_loc from mas_loc where ccn in (『FH』,』FHH』,』FHM』);

2、substr(string A,int start,int len)和 substring(string A,int start,int len),用法一樣

substr(time,1,8) 表示將time從第1位開始截取,截取的長度為8位

第一種用法:

substr(string A,int start)和 substring(string A,int start),用法一樣

功效:返回字元串A從下標start位置到結尾的字元串

第二種用法:

substr(string A,int start,int len)和 substring(string A,int start,int len),用法一樣

功效:返回字元串A從下標start位置開始,長度為len的字元串

3、get_json_object(form_data,'$.學生姓名') as student_name

json_tuple 函數的作用:用來解析json字元串中的多個欄位

4、split(full_name, '\\.') [5] AS zq;  取的是數組里的第六個

日期(時間)函數:

1、to_date(event_time) 返回日期部分

2、date_sub:返回當前日期的相對時間

當前日期:select curdate() 

當前日期前一天:select  date_sub(curdate(),interval 1 day)

當前日期後一天:select date_sub(curdate(),interval -1 day)

date_sub(from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss'), 14)  將現在的時間總秒數轉為標准格式時間,返回14天之前的時間

時間戳>>>>日期:

from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') 將現在的時間總秒數轉為標准格式時間

from_unixtime(get_json_object(get_json_object(form_data,'$.挽單時間'),'$.$date')/1000) as retain_time

unix_timestamp('2019-08-15 16:40:00','yyyy-MM-dd HH:mm:ss')  --1565858400

日期>>>>時間戳:unix_timestamp()

date_format:yyyy-MM-dd HH:mm:ss 時間轉格式化時間

select date_format('2019-10-07 13:24:20', 'yyyyMMdd000000')-- 20191007000000select date_format('2019-10-07', 'yyyyMMdd000000')-- 20191007000000

1.日期比較函數: datediff語法: datediff(string enddate,string startdate) 

返回值: int 

說明: 返回結束日期減去開始日期的天數。 

舉例:  hive> select datediff('2016-12-30','2016-12-29');  1

2.日期增加函數: date_add語法: date_add(string startdate, intdays) 

返回值: string 

說明: 返回開始日期startdate增加days天後的日期。 

舉例:  hive>select date_add('2016-12-29',10);  2017-01-08

3.日期減少函數: date_sub語法: date_sub (string startdate,int days) 

返回值: string 

說明: 返回開始日期startdate減少days天後的日期。 

舉例:  hive>select date_sub('2016-12-29',10);  2016-12-19

4.查詢近30天的數據

select * from table where datediff(current_timestamp,create_time)<=30;

create_time 為table里的欄位,current_timestamp 返回當前時間 2018-06-01 11:00:00

3、trunc()函數的用法:當前日期的各種第一天,或者對數字進行不四捨五入的截取

日期:

1.select trunc(sysdate) from al  --2011-3-18  今天的日期為2011-3-18

2.select trunc(sysdate, 'mm')   from   al  --2011-3-1    返回當月第一天.

上月1號    trunc(add_months(current_date(),-1),'MM')

3.select trunc(sysdate,'yy') from al  --2011-1-1       返回當年第一天

4.select trunc(sysdate,'dd') from al  --2011-3-18    返回當前年月日

5.select trunc(sysdate,'yyyy') from al  --2011-1-1   返回當年第一天

6.select trunc(sysdate,'d') from al  --2011-3-13 (星期天)返回當前星期的第一天

7.select trunc(sysdate, 'hh') from al   --2011-3-18 14:00:00   當前時間為14:41  

8.select trunc(sysdate, 'mi') from al  --2011-3-18 14:41:00   TRUNC()函數沒有秒的精確

數字:TRUNC(number,num_digits) Number 需要截尾取整的數字。Num_digits 的默認值為 0。TRUNC()函數截取時不進行四捨五入

11.select trunc(123.458,1) from al --123.4

12.select trunc(123.458,-1) from al --120

4、round():四捨五入:

select round(1.455, 2)  #結果是:1.46,即四捨五入到十分位,也就是保留兩位小數

select round(1.5)  #默認四捨五入到個位,結果是:2

select round(255, -1)  #結果是:260,即四捨五入到十位,此時個位是5會進位

floor():地板數

ceil()天花板數

5、

6.日期轉年函數: year語法:   year(string date) 

返回值: int

說明: 返回日期中的年。

舉例:

hive>   select year('2011-12-08 10:03:01') from al;

2011

hive>   select year('2012-12-08') fromal;

2012

7.日期轉月函數: month語法: month   (string date) 

返回值: int

說明: 返回日期中的月份。

舉例:

hive>   select month('2011-12-08 10:03:01') from al;

12

hive>   select month('2011-08-08') fromal;

8

8.日期轉天函數: day語法: day   (string date) 

返回值: int

說明: 返回日期中的天。

舉例:

hive>   select day('2011-12-08 10:03:01') from al;

8

hive>   select day('2011-12-24') fromal;

24

9.日期轉小時函數: hour語法: hour   (string date) 

返回值: int

說明: 返回日期中的小時。

舉例:

hive>   select hour('2011-12-08 10:03:01') from al;

10

10.日期轉分鍾函數: minute語法: minute   (string date) 

返回值: int

說明: 返回日期中的分鍾。

舉例:

hive>   select minute('2011-12-08 10:03:01') from al;

3

11.日期轉秒函數: second語法: second   (string date) 

返回值: int

說明: 返回日期中的秒。

舉例:

hive>   select second('2011-12-08 10:03:01') from al;

1

12.日期轉周函數: weekofyear語法:   weekofyear (string date) 

返回值: int

說明: 返回日期在當前的周數。

舉例:

hive>   select weekofyear('2011-12-08 10:03:01') from al;

49

查看hive表在hdfs中的位置:show create table 表名;

在hive中hive2hive,hive2hdfs:

HDFS、本地、hive -----> Hive:使用 insert into | overwrite、loaddata local inpath "" into table student;

Hive ----> Hdfs、本地:使用:insert overwrite | local

網站訪問量統計:

uv:每用戶訪問次數

ip:每ip(可能很多人)訪問次數

PV:是指頁面的瀏覽次數

VV:是指你訪問網站的次數

sql:

基本函數:

count、max、min、sum、avg、like、rlike('2%'、'_2%'、%2%'、'[2]')(java正則)

and、or、not、in   

where、group by、having、{ join on 、full join}  、order by(desc降序)

sort by需要與distribut by集合結合使用:

hive (default)> set maprece.job.reces=3;  //先設置rece的數量 

insert overwrite local directory '/opt/mole/datas/distribute-by'

row format delimited fields terminated by '\t'

先按照部門編號分區,再按照員工編號降序排序。

select * from emp distribute by deptno sort by empno desc;

外部表  create external table if not exists dept

分區表:create table dept_partition ( deptno int, dname string, loc string )  partitioned by ( month string )

load data local inpath '/opt/mole/datas/dept.txt' into table default.dept_partition partition(month='201809'); 

 alter table dept_partition add/drop partition(month='201805') ,partition(month='201804');

多分區聯合查詢:union

select * from dept_partition2 where month='201809' and day='10';

show partitions dept_partition;

desc formatted dept_partition;

二級分區表:create table dept_partition2 ( deptno int, dname string, loc string ) partitioned by (month string, day string) row format delimited fields terminated by '\t';

分桶抽樣查詢:分區針對的是數據的存儲路徑;分桶針對的是數據文件

create table stu_buck(id int, name string) clustered by(id) into 4 bucketsrow format delimited fields terminated by '\t';

設置開啟分桶與rece為1:

set hive.enforce.bucketing=true;

set maprece.job.reces=-1;

分桶抽樣:select * from stu_bucktablesample(bucket x out of y on id);

抽取,桶數/y,x是從哪個桶開始抽取,y越大 抽樣數越少,y與抽樣數成反比,x必須小於y

給空欄位賦值:

如果員工的comm為NULL,則用-1代替或用其他欄位代替  :select nvl(comm,-1) from emp;

case when:如何符合記為1,用於統計、分組統計

select dept_id, sum(case sex when '男' then 1 else 0 end) man , sum(case sex when '女' then 1 else 0 end) woman from emp_sex group by dept_id;

用於組合歸類匯總(行轉列):UDAF:多轉一

concat:拼接查詢結果

collect_set(col):去重匯總,產生array類型欄位,類似於distinct

select t.base, concat_ws('|',collect_set(t.name))   from (select concat_ws(',',xingzuo,blood_type) base,name  from person_info) t group by t.base;

解釋:先第一次查詢得到一張沒有按照(星座血型)分組的表,然後分組,使用collect_set將名字組合成數組,然後使用concat將數組變成字元串

用於拆分數據:(列轉行):UDTF:一轉多

explode(col):將hive一列中復雜的array或者map結構拆分成多行。

lateral view  側面顯示:用於和UDTF一對多函數搭配使用

用法:lateral view udtf(expression) tablealias as cate

cate:炸開之後的列別名

temptable :臨時表表名

解釋:用於和split, explode等UDTF一起使用,它能夠將一列數據拆成多行數據,在此基礎上可以對拆分後的數據進行聚合。

開窗函數:

Row_Number,Rank,Dense_Rank  over:針對統計查詢使用

Row_Number:返回從1開始的序列

Rank:生成分組中的排名序號,會在名詞s中留下空位。3 3 5

dense_rank:生成分組中的排名序號,不會在名詞中留下空位。3 3 4

over:主要是分組排序,搭配窗口函數使用

結果:

SUM、AVG、MIN、MAX、count

preceding:往前

following:往後

current row:當前行

unbounded:unbounded preceding 從前面的起點, unbounded following:到後面的終點

sum:直接使用sum是總的求和,結合over使用可統計至每一行的結果、總的結果、當前行+之前多少行/之後多少行、當前行到往後所有行的求和。

over(rowsbetween 3/current )  當前行到往後所有行的求和

ntile:分片,結合over使用,可以給數據分片,返回分片號

使用場景:統計出排名前百分之或n分之一的數據。

lead,lag,FIRST_VALUE,LAST_VALUE

lag與lead函數可以返回上下行的數據

lead(col,n,dafault) 用於統計窗口內往下第n行值

第一個參數為列名,第二個參數為往下第n行(可選,默認為1),第三個參數為默認值(當往下第n行為NULL時候,取默認值,如不指定,則為NULL)

LAG(col,n,DEFAULT) 用於統計窗口內往上第n行值

第一個參數為列名,第二個參數為往上第n行(可選,默認為1),第三個參數為默認值(當往上第n行為NULL時候,取默認值,如不指定,則為NULL)

使用場景:通常用於統計某用戶在某個網頁上的停留時間

FIRST_VALUE:取分組內排序後,截止到當前行,第一個值

LAST_VALUE:取分組內排序後,截止到當前行,最後一個值

范圍內求和: https://blog.csdn.net/happyrocking/article/details/105369558

cume_dist,percent_rank

–CUME_DIST :小於等於當前值的 行數 / 分組內總行數

–比如,統計小於等於當前薪水的人數,占總人數的比例

percent_rank:分組內當前行的RANK值-1/分組內總行數-1

總結:

在Spark中使用spark sql與hql一致,也可以直接使用sparkAPI實現。

HiveSql窗口函數主要應用於求TopN,分組排序TopN、TopN求和,前多少名前百分之幾。

與Flink窗口函數不同。

Flink中的窗口是用於將無線數據流切分為有限塊處理的手段。

window分類:

CountWindow:按照指定的數據條數生成一個 Window,與時間無關。

TimeWindow:按照時間生成 Window。

1. 滾動窗口(Tumbling Windows):時間對齊,窗口長度固定,不重疊::常用於時間段內的聚合計算

2.滑動窗口(Sliding Windows):時間對齊,窗口長度固定,可以有重疊::適用於一段時間內的統計(某介面最近 5min 的失敗率來報警)

3. 會話窗口(Session Windows)無時間對齊,無長度,不重疊::設置session間隔,超過時間間隔則窗口關閉。

⑤ Flink:特性、概念、組件棧、架構及原理分析

簡單之美 | Apache Flink:特性、概念、組件棧、架構及原理分析
http://shiyanjun.cn/archives/1508.html

Apache Flink是一個面向分布式數據流處理和批量數據處理的開源計算平台,它能夠基於同一個Flink運行時(Flink Runtime),提供支持流處理和批處理兩種類型應用的功能。現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為他們它們所提供的SLA是完全不相同的:流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理,所以在實現的時候通常是分別給出兩套實現方法,或者通過一個獨立的開源框架來實現其中每一種處理方案。例如,實現批處理的開源方案有MapRece、Tez、Crunch、Spark,實現流處理的開源方案有Samza、Storm。Flink在實現流處理和批處理時,與傳統的一些方案完全不同,它從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數據流是無界的;批處理被作為一種特殊的流處理,只是它的輸入數據流被定義為有界的。基於同一個Flink運行時(Flink Runtime),分別提供了流處理和批處理API,而這兩種API也是實現上層面向流處理、批處理類型應用框架的基礎。
基本特性
關於Flink所支持的特性,我這里只是通過分類的方式簡單做一下梳理,涉及到具體的一些概念及其原理會在後面的部分做詳細說明。
流處理特性
支持高吞吐、低延遲、高性能的流處理
支持帶有事件時間的窗口(Window)操作
支持有狀態計算的Exactly-once語義
支持高度靈活的窗口(Window)操作,支持基於time、count、session,以及data-driven的窗口操作
支持具有Backpressure功能的持續流模型
支持基於輕量級分布式快照(Snapshot)實現的容錯
一個運行時同時支持Batch on Streaming處理和Streaming處理
Flink在JVM內部實現了自己的內存管理
支持迭代計算
支持程序自動優化:避免特定情況下Shuffle、排序等昂貴操作,中間結果有必要進行緩存

API支持
對Streaming數據類應用,提供DataStream API
對批處理類應用,提供DataSet API(支持Java/Scala)

Libraries支持
支持機器學習(FlinkML)
支持圖分析(Gelly)
支持關系數據處理(Table)
支持復雜事件處理(CEP)

整合支持
支持Flink on YARN
支持HDFS
支持來自Kafka的輸入數據
支持Apache HBase
支持Hadoop程序
支持Tachyon
支持ElasticSearch
支持RabbitMQ
支持Apache Storm
支持S3
支持XtreemFS

基本概念
Stream & Transformation & Operator
用戶實現的Flink程序是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一個中間結果數據,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。當一個Flink程序被執行的時候,它會被映射為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。下面是一個由Flink程序映射為Streaming Dataflow的示意圖,如下所示:

比如從Source[1]到map()[1],它保持了Source的分區特性(Partitioning)和分區內元素處理的有序性,也就是說map()[1]的Subtask看到數據流中記錄的順序,與Source[1]中看到的記錄順序是一致的。
Redistribution模式

這種模式改變了輸入數據流的分區,比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多個不同的Subtask發送數據,改變了數據流的分區,這與實際應用所選擇的Operator有關系。另外,Source Operator對應2個Subtask,所以並行度為2,而Sink Operator的Subtask只有1個,故而並行度為1。
Task & Operator Chain
在Flink分布式執行環境中,會將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的線程中執行,如下圖所示:

在Flink集群啟動的時候,TaskManager會向JobManager注冊,如果注冊成功,則JobManager會向TaskManager回復消息AcknowledgeRegistration。
SubmitJob

Flink程序內部通過Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。
CancelJob

請求取消一個Flink Job的執行,CancelJob消息中包含了Job的ID,如果成功則返回消息CancellationSuccess,失敗則返回消息CancellationFailure。
UpdateTaskExecutionState

TaskManager會向JobManager請求更新ExecutionGraph中的ExecutionVertex的狀態信息,更新成功則返回true。
RequestNextInputSplit

運行在TaskManager上面的Task,請求獲取下一個要處理的輸入Split,成功則返回NextInputSplit。
JobStatusChanged

ExecutionGraph向JobManager發送該消息,用來表示Flink Job的狀態發生的變化,例如:RUNNING、CANCELING、FINISHED等。
TaskManager
TaskManager也是一個Actor,它是實際負責執行計算的Worker,在其上執行Flink Job的一組Task。每個TaskManager負責管理其所在節點上的資源信息,如內存、磁碟、網路,在啟動的時候將資源的狀態向JobManager匯報。TaskManager端可以分成兩個階段:
注冊階段

TaskManager會向JobManager注冊,發送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然後TaskManager就可以進行初始化過程。
可操作階段

該階段TaskManager可以接收並處理與Task有關的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager無法連接到JobManager,這是TaskManager就失去了與JobManager的聯系,會自動進入「注冊階段」,只有完成注冊才能繼續處理Task相關的消息。
Client
當用戶提交一個Flink程序時,會首先創建一個Client,該Client首先會對用戶提交的Flink程序進行預處理,並提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,並建立到JobManager的連接,將Flink Job提交給JobManager。Client會將用戶提交的Flink程序組裝一個JobGraph, 並且是以JobGraph的形式提交的。一個JobGraph是一個Flink Dataflow,它由多個JobVertex組成的DAG。其中,一個JobGraph包含了一個Flink程序的如下信息:JobID、Job名稱、配置信息、一組JobVertex等。
組件棧
Flink是一個分層架構的系統,每一層所包含的組件都提供了特定的抽象,用來服務於上層組件。Flink分層的組件棧如下圖所示:

了解YARN的話,對上圖的原理非常熟悉,實際Flink也實現了滿足在YARN集群上運行的各個組件:Flink YARN Client負責與YARN RM通信協商資源請求,Flink JobManager和Flink TaskManager分別申請到Container去運行各自的進程。通過上圖可以看到,YARN AM與Flink JobManager在同一個Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請Container去啟動Flink TaskManager。待Flink成功運行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,並進行後續的映射、調度和計算處理。
Runtime層

Runtime層提供了支持Flink計算的全部核心實現,比如:支持分布式Stream處理、JobGraph到ExecutionGraph的映射、調度等等,為上層API層提供基礎服務。
API層

API層主要實現了面向無界Stream的流處理和面向Batch的批處理API,其中面向流處理對應DataStream API,面向批處理對應DataSet API。
Libraries層

該層也可以稱為Flink應用框架層,根據API層的劃分,在API層之上構建的滿足特定應用的實現計算框架,也分別對應於面向流處理和面向批處理兩類。面向流處理支持:CEP(復雜事件處理)、基於SQL-like的操作(基於Table的關系操作);面向批處理支持:FlinkML(機器學習庫)、Gelly(圖處理)。
內部原理
容錯機制
Flink基於Checkpoint機制實現容錯,它的原理是不斷地生成分布式Streaming數據流Snapshot。在流處理失敗時,通過這些Snapshot可以恢復數據流處理。理解Flink的容錯機制,首先需要了解一下Barrier這個概念:Stream Barrier是Flink分布式Snapshotting中的核心元素,它會作為數據流的記錄被同等看待,被插入到數據流中,將數據流中記錄的進行分組,並沿著數據流的方向向前推進。每個Barrier會攜帶一個Snapshot ID,屬於該Snapshot的記錄會被推向該Barrier的前方。因為Barrier非常輕量,所以並不會中斷數據流。帶有Barrier的數據流,如下圖所示:

接收到Barrier n的Stream被臨時擱置,來自這些Stream的記錄不會被處理,而是被放在一個Buffer中
一旦最後一個Stream接收到Barrier n,Operator會emit所有暫存在Buffer中的記錄,然後向Checkpoint Coordinator發送Snapshot n
繼續處理來自多個Stream的記錄

基於Stream Aligning操作能夠實現Exactly Once語義,但是也會給流處理應用帶來延遲,因為為了排列對齊Barrier,會暫時緩存一部分Stream的記錄到Buffer中,尤其是在數據流並行度很高的場景下可能更加明顯,通常以最遲對齊Barrier的一個Stream為處理Buffer中緩存記錄的時刻點。在Flink中,提供了一個開關,選擇是否使用Stream Aligning,如果關掉則Exactly Once會變成At least once。
調度機制
在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將一個JobGraph轉換映射為一個ExecutionGraph,如下圖所示:

迭代機制
機器學習和圖計算應用,都會使用到迭代計算,Flink通過在迭代Operator中定義Step函數來實現迭代演算法,這種迭代演算法包括Iterate和Delta Iterate兩種類型,在實現上它們反復地在當前迭代狀態上調用Step函數,直到滿足給定的條件才會停止迭代。下面,對Iterate和Delta Iterate兩種類型的迭代演算法原理進行說明:
Iterate

Iterate Operator是一種簡單的迭代形式:每一輪迭代,Step函數的輸入或者是輸入的整個數據集,或者是上一輪迭代的結果,通過該輪迭代計算出下一輪計算所需要的輸入(也稱為Next Partial Solution),滿足迭代的終止條件後,會輸出最終迭代結果,具體執行流程如下圖所示:

Delta Iterate Operator實現了增量迭代,它的實現原理如下圖所示:

另外,Flink還提供了3個參數來配置Backpressure監控行為:
參數名稱
默認值
說明

jobmanager.web.backpressure.refresh-interval
60000
默認1分鍾,表示采樣統計結果刷新時間間隔

jobmanager.web.backpressure.num-samples
100
評估Backpressure狀態,所使用的堆棧跟蹤調用次數

jobmanager.web.backpressure.delay-between-samples
50
默認50毫秒,表示對一個Job的每個Task依次調用的時間間隔

通過上面個定義的Backpressure狀態,以及調整相應的參數,可以確定當前運行的Job的狀態是否正常,並且保證不影響JobManager提供服務。
參考鏈接
http://flink.apache.org/
http://flink.apache.org/features.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/general_arch.html
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheling.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/libs/cep.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/gelly.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/ml/index.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/table.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
http://geek.csdn.net/news/detail/56272
http://samza.apache.org/