① Flink sql 知其所以然(五)| 自定義 protobuf format
protobuf 作為目前各大公司中最廣泛使用的高效的協議數據交換格式工具庫,會大量作為流式數據傳輸的序列化方式,所以在 flink sql 中如果能實現 protobuf 的 format 會非常有用( 目前社區已經有對應的實現,不過目前還沒有 merge,預計在 1.14 系列版本中能 release )。
issue 見: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC
pr 見: https://github.com/apache/flink/pull/14376
這一節主要介紹 flink sql 中怎麼自定義實現 format ,其中以最常使用的 protobuf 作為案例來介紹。
如果想在本地直接測試下:
關於為什麼選擇 protobuf 可以看這篇文章,寫的很詳細:
http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral
在實時計算的領域中,為了可讀性會選擇 json ,為了效率以及一些已經依賴了 grpc 的公司會選擇 protobuf 來做數據序列化,那麼自然而然,日誌的序列化方式也會選擇 protobuf 。
而官方目前已經 release 的版本中是沒有提供 flink sql api 的 protobuf format 的。如下圖,基於 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
因此本文在介紹怎樣自定義一個 format 的同時,實現一個 protobuf format 來給大家使用。
預期效果是先實現幾種最基本的數據類型,包括 protobuf 中的 message (自定義 model)、 map (映射)、 repeated (列表)、其他基本數據類型等,這些都是我們最常使用的類型。
預期 protobuf message 定義如下:
測試數據源數據如下,博主把 protobuf 的數據轉換為 json,以方便展示,如下圖:
預期 flink sql:
數據源表 DDL:
數據匯表 DDL:
Transform 執行邏輯:
下面是我在本地跑的結果:
可以看到列印的結果,數據是正確的被反序列化讀入,並且最終輸出到 console。
目前業界可以參考的實現如下: https://github.com/maosuhan/flink-pb , 也就是這位哥們負責目前 flink protobuf 的 format。
這種實現的具體使用方式如下:
其實現有幾個特點:
[圖片上傳失敗...(image-66c35b-1644940704671)]
其實上節已經詳細描述了 flink sql 對於 sourcesinkformat 的載入機制。
如圖 serde format 是通過 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 創建的
所有通過 SPI 的 sourcesinkformt 插件都繼承自 Factory 。
整體創建 format 方法的調用鏈如下圖。
最終實現如下,涉及到了幾個實現類:
具體流程:
上述實現類的具體關系如下:
介紹完流程,進入具體實現方案細節:
ProtobufFormatFactory 主要創建 format 的邏輯:
resourcesMETA-INF 文件:
主要實現反序列化的邏輯:
可以注意到上述反序列化的主要邏輯就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定義的。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 其實就是一個 convertor 介面:
其作用就是將 protobuf message 中的每一個欄位轉換成為 RowData 中的每一個欄位。
ProtobufToRowDataConverters 中就定義了具體轉換邏輯,如截圖所示,每一個 LogicalType 都定義了 protobuf message 欄位轉換為 flink 數據類型的邏輯:
源碼後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取。
本文主要是針對 flink sql protobuf format 進行了原理解釋以及對應的實現。
如果你正好需要這么一個 format,直接後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取源碼吧。
當然上述只是 protobuf format 一個基礎的實現,用於生產環境還有很多方面可以去擴展的。
② 大數據分析應該掌握哪些基礎知識
Java基礎語法
· 分支結構if/switch
· 循環結構for/while/do while
· 方法聲明和調用
· 方法重載
· 數組的使用
· 命令行參數、可變參數
IDEA
· IDEA常用設置、常用快捷鍵
· 自定義模板
· 關聯Tomcat
· Web項目案例實操
面向對象編程
· 封裝、繼承、多態、構造器、包
· 異常處理機制
· 抽象類、介面、內部類
· 常有基礎API、集合List/Set/Map
· 泛型、線程的創建和啟動
· 深入集合源碼分析、常見數據結構解析
· 線程的安全、同步和通信、IO流體系
· 反射、類的載入機制、網路編程
Java8/9/10/11新特性
· Lambda表達式、方法引用
· 構造器引用、StreamAPI
· jShell(JShell)命令
· 介面的私有方法、Optional加強
· 局部變數的類型推斷
· 更簡化的編譯運行程序等
MySQL
· DML語言、DDL語言、DCL語言
· 分組查詢、Join查詢、子查詢、Union查詢、函數
· 流程式控制制語句、事務的特點、事務的隔離級別等
JDBC
· 使用JDBC完成資料庫增刪改查操作
· 批處理的操作
· 資料庫連接池的原理及應用
· 常見資料庫連接池C3P0、DBCP、Druid等
Maven
· Maven環境搭建
· 本地倉庫&中央倉庫
· 創建Web工程
· 自動部署
· 持續繼承
· 持續部署
Linux
· VI/VIM編輯器
· 系統管理操作&遠程登錄
· 常用命令
· 軟體包管理&企業真題
Shell編程
· 自定義變數與特殊變數
· 運算符
· 條件判斷
· 流程式控制制
· 系統函數&自定義函數
· 常用工具命令
· 面試真題
Hadoop
· Hadoop生態介紹
· Hadoop運行模式
· 源碼編譯
· HDFS文件系統底層詳解
· DN&NN工作機制
· HDFS的API操作
· MapRece框架原理
· 數據壓縮
· Yarn工作機制
· MapRece案例詳解
· Hadoop參數調優
· HDFS存儲多目錄
· 多磁碟數據均衡
· LZO壓縮
· Hadoop基準測試
Zookeeper
· Zookeeper數據結果
· 內部原理
· 選舉機制
· Stat結構體
· 監聽器
· 分布式安裝部署
· API操作
· 實戰案例
· 面試真題
· 啟動停止腳本
HA+新特性
· HDFS-HA集群配置
Hive
· Hive架構原理
· 安裝部署
· 遠程連接
· 常見命令及基本數據類型
· DML數據操作
· 查詢語句
· Join&排序
· 分桶&函數
· 壓縮&存儲
· 企業級調優
· 實戰案例
· 面試真題
Flume
· Flume架構
· Agent內部原理
· 事務
· 安裝部署
· 實戰案例
· 自定義Source
· 自定義Sink
· Ganglia監控
Kafka
· 消息隊列
· Kafka架構
· 集群部署
· 命令行操作
· 工作流程分析
· 分區分配策略
· 數據寫入流程
· 存儲策略
· 高階API
· 低級API
· 攔截器
· 監控
· 高可靠性存儲
· 數據可靠性和持久性保證
· ISR機制
· Kafka壓測
· 機器數量計算
· 分區數計算
· 啟動停止腳本
DataX
· 安裝
· 原理
· 數據一致性
· 空值處理
· LZO壓縮處理
Scala
· Scala基礎入門
· 函數式編程
· 數據結構
· 面向對象編程
· 模式匹配
· 高階函數
· 特質
· 註解&類型參數
· 隱式轉換
· 高級類型
· 案例實操
Spark Core
· 安裝部署
· RDD概述
· 編程模型
· 持久化&檢查點機制
· DAG
· 運算元詳解
· RDD編程進階
· 累加器&廣播變數
Spark SQL
· SparkSQL
· DataFrame
· DataSet
· 自定義UDF&UDAF函數
Spark Streaming
· SparkStreaming
· 背壓機制原理
· Receiver和Direct模式原理
· Window原理及案例實操
· 7x24 不間斷運行&性能考量
Spark內核&優化
· 內核源碼詳解
· 優化詳解
Hbase
· Hbase原理及架構
· 數據讀寫流程
· API使用
· 與Hive和Sqoop集成
· 企業級調優
Presto
· Presto的安裝部署
· 使用Presto執行數倉項目的即席查詢模塊
Ranger2.0
· 許可權管理工具Ranger的安裝和使用
Azkaban3.0
· 任務調度工具Azkaban3.0的安裝部署
· 使用Azkaban進行項目任務調度,實現電話郵件報警
Kylin3.0
· Kylin的安裝部署
· Kylin核心思想
· 使用Kylin對接數據源構建模型
Atlas2.0
· 元數據管理工具Atlas的安裝部署
Zabbix
· 集群監控工具Zabbix的安裝部署
DolphinScheler
· 任務調度工具DolphinScheler的安裝部署
· 實現數倉項目任務的自動化調度、配置郵件報警
Superset
· 使用SuperSet對數倉項目的計算結果進行可視化展示
Echarts
· 使用Echarts對數倉項目的計算結果進行可視化展示
Redis
· Redis安裝部署
· 五大數據類型
· 總體配置
· 持久化
· 事務
· 發布訂閱
· 主從復制
Canal
· 使用Canal實時監控MySQL數據變化採集至實時項目
Flink
· 運行時架構
· 數據源Source
· Window API
· Water Mark
· 狀態編程
· CEP復雜事件處理
Flink SQL
· Flink SQL和Table API詳細解讀
Flink 內核
· Flink內核源碼講解
· 經典面試題講解
Git&GitHub
· 安裝配置
· 本地庫搭建
· 基本操作
· 工作流
· 集中式
ClickHouse
· ClickHouse的安裝部署
· 讀寫機制
· 數據類型
· 執行引擎
DataV
· 使用DataV對實時項目需求計算結果進行可視化展示
sugar
· 結合Springboot對接網路sugar實現數據可視化大屏展示
Maxwell
· 使用Maxwell實時監控MySQL數據變化採集至實時項目
ElasticSearch
· ElasticSearch索引基本操作、案例實操
Kibana
· 通過Kibana配置可視化分析
Springboot
· 利用Springboot開發可視化介面程序
③ flinksql-core-動態表
普通動態表是FlinkSQL中的一類表,表中的數據與連接的外部數據對等,可以簡單理解為把一張mysql的表放進flink內存中得到的表,並且該表與mysql表有連接關系,即該表可以讀寫mysql表。
需要聲明表的欄位定義和表屬性(連接器屬性)。語法如下:
with關鍵字前面的是欄位定義,with關鍵字後面的是表屬性。其中欄位定義時還可以聲明表主鍵,聲明語法為PARIMARY KEY(myColumn1,...) NOT ENFORCED, 這里的not enforced表示flinksql不會對主鍵做強制的唯一性約束、非空約束,而且目前flinksql中只支持這種類型的主鍵。
表屬性中有若干個屬性欄位需要聲明,具體有哪些屬性欄位取決於使用哪個連接器,如上述聲明中使用的是jdbc連接器,在使用該連接器時需要提供url、username、password等屬性,通過此連接器我們就可以讓該表能連接到對應的mysql表。
我們可以查詢flinksql普通動態表的數據,此數據與連接的外部數據是一致的。語法如下:
tips:在運行時,只會載入一次外部數據到flinksql普通動態表。後續外部數據表有更新時,flinksql的普通動態表不會跟著自動更新。
我們可以把數據寫入到flinksql動態表,從而實現寫入數據到外部系統的目的。語法如下:
④ Hive sql及窗口函數
hive函數:
1、根據指定條件返回結果:case when then else end as
2、基本類型轉換:CAST()
3、nvl:處理空欄位:三個str時,是否為空可以指定返回不同的值
4、sql通配符: https://www.w3school.com.cn/sql/sql_wildcards.asp
5、count(1)與COUNT(*):返回行數
如果表沒有主鍵,那麼count(1)比count(*)快;
如果有主鍵,那麼count(主鍵,聯合主鍵)比count(*)快;
count(1)跟count(主鍵)一樣,只掃描主鍵。count(*)跟count(非主鍵)一樣,掃描整個表。明顯前者更快一些。
性能問題:
1.任何情況下SELECT COUNT(*) FROM tablename是最優選擇,(指沒有where的情況);
2.盡量減少SELECT COUNT(*) FROM tablename WHERE COL = 『value』 這種查詢;
3.杜絕SELECT COUNT(COL) FROM tablename WHERE COL2 = 『value』 的出現。
count(expression):查詢 is_reply=0 的數量: SELECT COUNT(IF(is_reply=0,1,NULL)) count FROM t_iov_help_feedback;
6、distinct與group by
distinct去重所有distinct之後所有的欄位,如果有一個欄位值不一致就不作為一條
group by是根據某一欄位分組,然後查詢出該條數據的所需欄位,可以搭配 where max(time)或者Row_Number函數使用,求出最大的一條數據
7、使用with 臨時表名 as() 的形式,簡單的臨時表直接嵌套進sql中,復雜的和需要復用的表寫到臨時表中,關聯的時候先找到關聯欄位,過濾條件最好在臨時表中先過濾後關聯
處理json的函數:
split(json_array_string(schools), '\\|\\|') AS schools
get_json_object(school, '$.id') AS school_id,
字元串函數:
1、instr(』源字元串』 , 『目標字元串』 ,』開始位置』,』第幾次出現』)
instr(sourceString,destString,start,appearPosition)
1.sourceString代表源字元串; destString代表要從源字元串中查找的子串;
2.start代表查找的開始位置,這個參數可選的,默認為1;
3.appearPosition代表想從源字元中查找出第幾次出現的destString,這個參數也是可選的, 默認為1
4.如果start的值為負數,則代表從右往左進行查找,但是位置數據仍然從左向右計算。
5.返回值為:查找到的字元串的位置。如果沒有查找到,返回0。
最簡單例子: 在abcd中查找a的位置,從第一個字母開始查,查找第一次出現時的位置
select instr(『abcd』,』a』,1,1) from al; —1
應用於模糊查詢:instr(欄位名/列名, 『查找欄位』)
select code,name,dept,occupation from staff where instr(code, 『001』)> 0;
等同於 select code, name, dept, occupation from staff where code like 『%001%』 ;
應用於判斷包含關系:
select ccn,mas_loc from mas_loc where instr(『FH,FHH,FHM』,ccn)>0;
等同於 select ccn,mas_loc from mas_loc where ccn in (『FH』,』FHH』,』FHM』);
2、substr(string A,int start,int len)和 substring(string A,int start,int len),用法一樣
substr(time,1,8) 表示將time從第1位開始截取,截取的長度為8位
第一種用法:
substr(string A,int start)和 substring(string A,int start),用法一樣
功效:返回字元串A從下標start位置到結尾的字元串
第二種用法:
substr(string A,int start,int len)和 substring(string A,int start,int len),用法一樣
功效:返回字元串A從下標start位置開始,長度為len的字元串
3、get_json_object(form_data,'$.學生姓名') as student_name
json_tuple 函數的作用:用來解析json字元串中的多個欄位
4、split(full_name, '\\.') [5] AS zq; 取的是數組里的第六個
日期(時間)函數:
1、to_date(event_time) 返回日期部分
2、date_sub:返回當前日期的相對時間
當前日期:select curdate()
當前日期前一天:select date_sub(curdate(),interval 1 day)
當前日期後一天:select date_sub(curdate(),interval -1 day)
date_sub(from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss'), 14) 將現在的時間總秒數轉為標准格式時間,返回14天之前的時間
時間戳>>>>日期:
from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') 將現在的時間總秒數轉為標准格式時間
from_unixtime(get_json_object(get_json_object(form_data,'$.挽單時間'),'$.$date')/1000) as retain_time
unix_timestamp('2019-08-15 16:40:00','yyyy-MM-dd HH:mm:ss') --1565858400
日期>>>>時間戳:unix_timestamp()
date_format:yyyy-MM-dd HH:mm:ss 時間轉格式化時間
select date_format('2019-10-07 13:24:20', 'yyyyMMdd000000')-- 20191007000000select date_format('2019-10-07', 'yyyyMMdd000000')-- 20191007000000
1.日期比較函數: datediff語法: datediff(string enddate,string startdate)
返回值: int
說明: 返回結束日期減去開始日期的天數。
舉例: hive> select datediff('2016-12-30','2016-12-29'); 1
2.日期增加函數: date_add語法: date_add(string startdate, intdays)
返回值: string
說明: 返回開始日期startdate增加days天後的日期。
舉例: hive>select date_add('2016-12-29',10); 2017-01-08
3.日期減少函數: date_sub語法: date_sub (string startdate,int days)
返回值: string
說明: 返回開始日期startdate減少days天後的日期。
舉例: hive>select date_sub('2016-12-29',10); 2016-12-19
4.查詢近30天的數據
select * from table where datediff(current_timestamp,create_time)<=30;
create_time 為table里的欄位,current_timestamp 返回當前時間 2018-06-01 11:00:00
3、trunc()函數的用法:當前日期的各種第一天,或者對數字進行不四捨五入的截取
日期:
1.select trunc(sysdate) from al --2011-3-18 今天的日期為2011-3-18
2.select trunc(sysdate, 'mm') from al --2011-3-1 返回當月第一天.
上月1號 trunc(add_months(current_date(),-1),'MM')
3.select trunc(sysdate,'yy') from al --2011-1-1 返回當年第一天
4.select trunc(sysdate,'dd') from al --2011-3-18 返回當前年月日
5.select trunc(sysdate,'yyyy') from al --2011-1-1 返回當年第一天
6.select trunc(sysdate,'d') from al --2011-3-13 (星期天)返回當前星期的第一天
7.select trunc(sysdate, 'hh') from al --2011-3-18 14:00:00 當前時間為14:41
8.select trunc(sysdate, 'mi') from al --2011-3-18 14:41:00 TRUNC()函數沒有秒的精確
數字:TRUNC(number,num_digits) Number 需要截尾取整的數字。Num_digits 的默認值為 0。TRUNC()函數截取時不進行四捨五入
11.select trunc(123.458,1) from al --123.4
12.select trunc(123.458,-1) from al --120
4、round():四捨五入:
select round(1.455, 2) #結果是:1.46,即四捨五入到十分位,也就是保留兩位小數
select round(1.5) #默認四捨五入到個位,結果是:2
select round(255, -1) #結果是:260,即四捨五入到十位,此時個位是5會進位
floor():地板數
ceil()天花板數
5、
6.日期轉年函數: year語法: year(string date)
返回值: int
說明: 返回日期中的年。
舉例:
hive> select year('2011-12-08 10:03:01') from al;
2011
hive> select year('2012-12-08') fromal;
2012
7.日期轉月函數: month語法: month (string date)
返回值: int
說明: 返回日期中的月份。
舉例:
hive> select month('2011-12-08 10:03:01') from al;
12
hive> select month('2011-08-08') fromal;
8
8.日期轉天函數: day語法: day (string date)
返回值: int
說明: 返回日期中的天。
舉例:
hive> select day('2011-12-08 10:03:01') from al;
8
hive> select day('2011-12-24') fromal;
24
9.日期轉小時函數: hour語法: hour (string date)
返回值: int
說明: 返回日期中的小時。
舉例:
hive> select hour('2011-12-08 10:03:01') from al;
10
10.日期轉分鍾函數: minute語法: minute (string date)
返回值: int
說明: 返回日期中的分鍾。
舉例:
hive> select minute('2011-12-08 10:03:01') from al;
3
11.日期轉秒函數: second語法: second (string date)
返回值: int
說明: 返回日期中的秒。
舉例:
hive> select second('2011-12-08 10:03:01') from al;
1
12.日期轉周函數: weekofyear語法: weekofyear (string date)
返回值: int
說明: 返回日期在當前的周數。
舉例:
hive> select weekofyear('2011-12-08 10:03:01') from al;
49
查看hive表在hdfs中的位置:show create table 表名;
在hive中hive2hive,hive2hdfs:
HDFS、本地、hive -----> Hive:使用 insert into | overwrite、loaddata local inpath "" into table student;
Hive ----> Hdfs、本地:使用:insert overwrite | local
網站訪問量統計:
uv:每用戶訪問次數
ip:每ip(可能很多人)訪問次數
PV:是指頁面的瀏覽次數
VV:是指你訪問網站的次數
sql:
基本函數:
count、max、min、sum、avg、like、rlike('2%'、'_2%'、%2%'、'[2]')(java正則)
and、or、not、in
where、group by、having、{ join on 、full join} 、order by(desc降序)
sort by需要與distribut by集合結合使用:
hive (default)> set maprece.job.reces=3; //先設置rece的數量
insert overwrite local directory '/opt/mole/datas/distribute-by'
row format delimited fields terminated by '\t'
先按照部門編號分區,再按照員工編號降序排序。
select * from emp distribute by deptno sort by empno desc;
外部表 create external table if not exists dept
分區表:create table dept_partition ( deptno int, dname string, loc string ) partitioned by ( month string )
load data local inpath '/opt/mole/datas/dept.txt' into table default.dept_partition partition(month='201809');
alter table dept_partition add/drop partition(month='201805') ,partition(month='201804');
多分區聯合查詢:union
select * from dept_partition2 where month='201809' and day='10';
show partitions dept_partition;
desc formatted dept_partition;
二級分區表:create table dept_partition2 ( deptno int, dname string, loc string ) partitioned by (month string, day string) row format delimited fields terminated by '\t';
分桶抽樣查詢:分區針對的是數據的存儲路徑;分桶針對的是數據文件
create table stu_buck(id int, name string) clustered by(id) into 4 bucketsrow format delimited fields terminated by '\t';
設置開啟分桶與rece為1:
set hive.enforce.bucketing=true;
set maprece.job.reces=-1;
分桶抽樣:select * from stu_bucktablesample(bucket x out of y on id);
抽取,桶數/y,x是從哪個桶開始抽取,y越大 抽樣數越少,y與抽樣數成反比,x必須小於y
給空欄位賦值:
如果員工的comm為NULL,則用-1代替或用其他欄位代替 :select nvl(comm,-1) from emp;
case when:如何符合記為1,用於統計、分組統計
select dept_id, sum(case sex when '男' then 1 else 0 end) man , sum(case sex when '女' then 1 else 0 end) woman from emp_sex group by dept_id;
用於組合歸類匯總(行轉列):UDAF:多轉一
concat:拼接查詢結果
collect_set(col):去重匯總,產生array類型欄位,類似於distinct
select t.base, concat_ws('|',collect_set(t.name)) from (select concat_ws(',',xingzuo,blood_type) base,name from person_info) t group by t.base;
解釋:先第一次查詢得到一張沒有按照(星座血型)分組的表,然後分組,使用collect_set將名字組合成數組,然後使用concat將數組變成字元串
用於拆分數據:(列轉行):UDTF:一轉多
explode(col):將hive一列中復雜的array或者map結構拆分成多行。
lateral view 側面顯示:用於和UDTF一對多函數搭配使用
用法:lateral view udtf(expression) tablealias as cate
cate:炸開之後的列別名
temptable :臨時表表名
解釋:用於和split, explode等UDTF一起使用,它能夠將一列數據拆成多行數據,在此基礎上可以對拆分後的數據進行聚合。
開窗函數:
Row_Number,Rank,Dense_Rank over:針對統計查詢使用
Row_Number:返回從1開始的序列
Rank:生成分組中的排名序號,會在名詞s中留下空位。3 3 5
dense_rank:生成分組中的排名序號,不會在名詞中留下空位。3 3 4
over:主要是分組排序,搭配窗口函數使用
結果:
SUM、AVG、MIN、MAX、count
preceding:往前
following:往後
current row:當前行
unbounded:unbounded preceding 從前面的起點, unbounded following:到後面的終點
sum:直接使用sum是總的求和,結合over使用可統計至每一行的結果、總的結果、當前行+之前多少行/之後多少行、當前行到往後所有行的求和。
over(rowsbetween 3/current ) 當前行到往後所有行的求和
ntile:分片,結合over使用,可以給數據分片,返回分片號
使用場景:統計出排名前百分之或n分之一的數據。
lead,lag,FIRST_VALUE,LAST_VALUE
lag與lead函數可以返回上下行的數據
lead(col,n,dafault) 用於統計窗口內往下第n行值
第一個參數為列名,第二個參數為往下第n行(可選,默認為1),第三個參數為默認值(當往下第n行為NULL時候,取默認值,如不指定,則為NULL)
LAG(col,n,DEFAULT) 用於統計窗口內往上第n行值
第一個參數為列名,第二個參數為往上第n行(可選,默認為1),第三個參數為默認值(當往上第n行為NULL時候,取默認值,如不指定,則為NULL)
使用場景:通常用於統計某用戶在某個網頁上的停留時間
FIRST_VALUE:取分組內排序後,截止到當前行,第一個值
LAST_VALUE:取分組內排序後,截止到當前行,最後一個值
范圍內求和: https://blog.csdn.net/happyrocking/article/details/105369558
cume_dist,percent_rank
–CUME_DIST :小於等於當前值的 行數 / 分組內總行數
–比如,統計小於等於當前薪水的人數,占總人數的比例
percent_rank:分組內當前行的RANK值-1/分組內總行數-1
總結:
在Spark中使用spark sql與hql一致,也可以直接使用sparkAPI實現。
HiveSql窗口函數主要應用於求TopN,分組排序TopN、TopN求和,前多少名前百分之幾。
與Flink窗口函數不同。
Flink中的窗口是用於將無線數據流切分為有限塊處理的手段。
window分類:
CountWindow:按照指定的數據條數生成一個 Window,與時間無關。
TimeWindow:按照時間生成 Window。
1. 滾動窗口(Tumbling Windows):時間對齊,窗口長度固定,不重疊::常用於時間段內的聚合計算
2.滑動窗口(Sliding Windows):時間對齊,窗口長度固定,可以有重疊::適用於一段時間內的統計(某介面最近 5min 的失敗率來報警)
3. 會話窗口(Session Windows)無時間對齊,無長度,不重疊::設置session間隔,超過時間間隔則窗口關閉。
⑤ Flink:特性、概念、組件棧、架構及原理分析
簡單之美 | Apache Flink:特性、概念、組件棧、架構及原理分析
http://shiyanjun.cn/archives/1508.html
Apache Flink是一個面向分布式數據流處理和批量數據處理的開源計算平台,它能夠基於同一個Flink運行時(Flink Runtime),提供支持流處理和批處理兩種類型應用的功能。現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為他們它們所提供的SLA是完全不相同的:流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理,所以在實現的時候通常是分別給出兩套實現方法,或者通過一個獨立的開源框架來實現其中每一種處理方案。例如,實現批處理的開源方案有MapRece、Tez、Crunch、Spark,實現流處理的開源方案有Samza、Storm。Flink在實現流處理和批處理時,與傳統的一些方案完全不同,它從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數據流是無界的;批處理被作為一種特殊的流處理,只是它的輸入數據流被定義為有界的。基於同一個Flink運行時(Flink Runtime),分別提供了流處理和批處理API,而這兩種API也是實現上層面向流處理、批處理類型應用框架的基礎。
基本特性
關於Flink所支持的特性,我這里只是通過分類的方式簡單做一下梳理,涉及到具體的一些概念及其原理會在後面的部分做詳細說明。
流處理特性
支持高吞吐、低延遲、高性能的流處理
支持帶有事件時間的窗口(Window)操作
支持有狀態計算的Exactly-once語義
支持高度靈活的窗口(Window)操作,支持基於time、count、session,以及data-driven的窗口操作
支持具有Backpressure功能的持續流模型
支持基於輕量級分布式快照(Snapshot)實現的容錯
一個運行時同時支持Batch on Streaming處理和Streaming處理
Flink在JVM內部實現了自己的內存管理
支持迭代計算
支持程序自動優化:避免特定情況下Shuffle、排序等昂貴操作,中間結果有必要進行緩存
API支持
對Streaming數據類應用,提供DataStream API
對批處理類應用,提供DataSet API(支持Java/Scala)
Libraries支持
支持機器學習(FlinkML)
支持圖分析(Gelly)
支持關系數據處理(Table)
支持復雜事件處理(CEP)
整合支持
支持Flink on YARN
支持HDFS
支持來自Kafka的輸入數據
支持Apache HBase
支持Hadoop程序
支持Tachyon
支持ElasticSearch
支持RabbitMQ
支持Apache Storm
支持S3
支持XtreemFS
基本概念
Stream & Transformation & Operator
用戶實現的Flink程序是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一個中間結果數據,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。當一個Flink程序被執行的時候,它會被映射為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。下面是一個由Flink程序映射為Streaming Dataflow的示意圖,如下所示:
比如從Source[1]到map()[1],它保持了Source的分區特性(Partitioning)和分區內元素處理的有序性,也就是說map()[1]的Subtask看到數據流中記錄的順序,與Source[1]中看到的記錄順序是一致的。
Redistribution模式
這種模式改變了輸入數據流的分區,比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多個不同的Subtask發送數據,改變了數據流的分區,這與實際應用所選擇的Operator有關系。另外,Source Operator對應2個Subtask,所以並行度為2,而Sink Operator的Subtask只有1個,故而並行度為1。
Task & Operator Chain
在Flink分布式執行環境中,會將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的線程中執行,如下圖所示:
在Flink集群啟動的時候,TaskManager會向JobManager注冊,如果注冊成功,則JobManager會向TaskManager回復消息AcknowledgeRegistration。
SubmitJob
Flink程序內部通過Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。
CancelJob
請求取消一個Flink Job的執行,CancelJob消息中包含了Job的ID,如果成功則返回消息CancellationSuccess,失敗則返回消息CancellationFailure。
UpdateTaskExecutionState
TaskManager會向JobManager請求更新ExecutionGraph中的ExecutionVertex的狀態信息,更新成功則返回true。
RequestNextInputSplit
運行在TaskManager上面的Task,請求獲取下一個要處理的輸入Split,成功則返回NextInputSplit。
JobStatusChanged
ExecutionGraph向JobManager發送該消息,用來表示Flink Job的狀態發生的變化,例如:RUNNING、CANCELING、FINISHED等。
TaskManager
TaskManager也是一個Actor,它是實際負責執行計算的Worker,在其上執行Flink Job的一組Task。每個TaskManager負責管理其所在節點上的資源信息,如內存、磁碟、網路,在啟動的時候將資源的狀態向JobManager匯報。TaskManager端可以分成兩個階段:
注冊階段
TaskManager會向JobManager注冊,發送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然後TaskManager就可以進行初始化過程。
可操作階段
該階段TaskManager可以接收並處理與Task有關的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager無法連接到JobManager,這是TaskManager就失去了與JobManager的聯系,會自動進入「注冊階段」,只有完成注冊才能繼續處理Task相關的消息。
Client
當用戶提交一個Flink程序時,會首先創建一個Client,該Client首先會對用戶提交的Flink程序進行預處理,並提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,並建立到JobManager的連接,將Flink Job提交給JobManager。Client會將用戶提交的Flink程序組裝一個JobGraph, 並且是以JobGraph的形式提交的。一個JobGraph是一個Flink Dataflow,它由多個JobVertex組成的DAG。其中,一個JobGraph包含了一個Flink程序的如下信息:JobID、Job名稱、配置信息、一組JobVertex等。
組件棧
Flink是一個分層架構的系統,每一層所包含的組件都提供了特定的抽象,用來服務於上層組件。Flink分層的組件棧如下圖所示:
了解YARN的話,對上圖的原理非常熟悉,實際Flink也實現了滿足在YARN集群上運行的各個組件:Flink YARN Client負責與YARN RM通信協商資源請求,Flink JobManager和Flink TaskManager分別申請到Container去運行各自的進程。通過上圖可以看到,YARN AM與Flink JobManager在同一個Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請Container去啟動Flink TaskManager。待Flink成功運行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,並進行後續的映射、調度和計算處理。
Runtime層
Runtime層提供了支持Flink計算的全部核心實現,比如:支持分布式Stream處理、JobGraph到ExecutionGraph的映射、調度等等,為上層API層提供基礎服務。
API層
API層主要實現了面向無界Stream的流處理和面向Batch的批處理API,其中面向流處理對應DataStream API,面向批處理對應DataSet API。
Libraries層
該層也可以稱為Flink應用框架層,根據API層的劃分,在API層之上構建的滿足特定應用的實現計算框架,也分別對應於面向流處理和面向批處理兩類。面向流處理支持:CEP(復雜事件處理)、基於SQL-like的操作(基於Table的關系操作);面向批處理支持:FlinkML(機器學習庫)、Gelly(圖處理)。
內部原理
容錯機制
Flink基於Checkpoint機制實現容錯,它的原理是不斷地生成分布式Streaming數據流Snapshot。在流處理失敗時,通過這些Snapshot可以恢復數據流處理。理解Flink的容錯機制,首先需要了解一下Barrier這個概念:Stream Barrier是Flink分布式Snapshotting中的核心元素,它會作為數據流的記錄被同等看待,被插入到數據流中,將數據流中記錄的進行分組,並沿著數據流的方向向前推進。每個Barrier會攜帶一個Snapshot ID,屬於該Snapshot的記錄會被推向該Barrier的前方。因為Barrier非常輕量,所以並不會中斷數據流。帶有Barrier的數據流,如下圖所示:
接收到Barrier n的Stream被臨時擱置,來自這些Stream的記錄不會被處理,而是被放在一個Buffer中
一旦最後一個Stream接收到Barrier n,Operator會emit所有暫存在Buffer中的記錄,然後向Checkpoint Coordinator發送Snapshot n
繼續處理來自多個Stream的記錄
基於Stream Aligning操作能夠實現Exactly Once語義,但是也會給流處理應用帶來延遲,因為為了排列對齊Barrier,會暫時緩存一部分Stream的記錄到Buffer中,尤其是在數據流並行度很高的場景下可能更加明顯,通常以最遲對齊Barrier的一個Stream為處理Buffer中緩存記錄的時刻點。在Flink中,提供了一個開關,選擇是否使用Stream Aligning,如果關掉則Exactly Once會變成At least once。
調度機制
在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將一個JobGraph轉換映射為一個ExecutionGraph,如下圖所示:
迭代機制
機器學習和圖計算應用,都會使用到迭代計算,Flink通過在迭代Operator中定義Step函數來實現迭代演算法,這種迭代演算法包括Iterate和Delta Iterate兩種類型,在實現上它們反復地在當前迭代狀態上調用Step函數,直到滿足給定的條件才會停止迭代。下面,對Iterate和Delta Iterate兩種類型的迭代演算法原理進行說明:
Iterate
Iterate Operator是一種簡單的迭代形式:每一輪迭代,Step函數的輸入或者是輸入的整個數據集,或者是上一輪迭代的結果,通過該輪迭代計算出下一輪計算所需要的輸入(也稱為Next Partial Solution),滿足迭代的終止條件後,會輸出最終迭代結果,具體執行流程如下圖所示:
Delta Iterate Operator實現了增量迭代,它的實現原理如下圖所示:
另外,Flink還提供了3個參數來配置Backpressure監控行為:
參數名稱
默認值
說明
jobmanager.web.backpressure.refresh-interval
60000
默認1分鍾,表示采樣統計結果刷新時間間隔
jobmanager.web.backpressure.num-samples
100
評估Backpressure狀態,所使用的堆棧跟蹤調用次數
jobmanager.web.backpressure.delay-between-samples
50
默認50毫秒,表示對一個Job的每個Task依次調用的時間間隔
通過上面個定義的Backpressure狀態,以及調整相應的參數,可以確定當前運行的Job的狀態是否正常,並且保證不影響JobManager提供服務。
參考鏈接
http://flink.apache.org/
http://flink.apache.org/features.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/general_arch.html
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheling.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/libs/cep.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/gelly.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/ml/index.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/table.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
http://geek.csdn.net/news/detail/56272
http://samza.apache.org/