① Spark sql一列拆分多列
將DataFrame中的一列拆分為多列,示例如下:
② Spark SQL CBO 基於代價的優化
Spark CBO 背景
本文將介紹 CBO,它充分考慮了數據本身的特點(如大小、分布)以及操作運算元的特點(中間結果集的分布及大小)及代價,從而更好的選擇執行代價最小的物理執行計劃,即 SparkPlan。
Spark CBO 原理
CBO 原理是計算所有可能的物理計劃的代價,並挑選出代價最小的物理執行計劃。其核心在於評估一個給定的物理執行計劃的代價。
物理執行計劃是一個樹狀結構,其代價等於每個執行節點的代價總合,如下圖所示。
而每個執行節點的代價,分為兩個部分
每個操作運算元的代價相對固定,可用規則來描述。而執行節點輸出數據集的大小與分布,分為兩個部分:1) 初始數據集,也即原始表,其數據集的大小與分布可直接通過統計得到;2)中間節點輸出數據集的大小與分布可由其輸入數據集的信息與操作本身的特點推算。
所以,最終主要需要解決兩個問題
Statistics 收集
通過如下 SQL 語句,可計算出整個表的記錄總數以及總大小
從如下示例中,Statistics 一行可見, customer 表數據總大小為 37026233 位元組,即 35.3MB,總記錄數為 28萬,與事實相符。
通過如下 SQL 語句,可計算出指定列的統計信息
從如下示例可見,customer 表的 c_customer_sk 列最小值為 1, 最大值為 280000,null 值個數為 0,不同值個數為 274368,平均列長度為 8,最大列長度為 8。
除上述示例中的統計信息外,Spark CBO 還直接等高直方圖。在上例中,histogram 為 NULL。其原因是,spark.sql.statistics.histogram.enabled 默認值為 false,也即 ANALYZE 時默認不計算及存儲 histogram。
下例中,通過 SET spark.sql.statistics.histogram.enabled=true; 啟用 histogram 後,完整的統計信息如下。
從上圖可見,生成的 histogram 為 equal-height histogram,且高度為 1102.36,bin 數為 254。其中 bin 個數可由 spark.sql.statistics.histogram.numBins 配置。對於每個 bin,勻記錄其最小值,最大值,以及 distinct count。
值得注意的是,這里的 distinct count 並不是精確值,而是通過 HyperLogLog 計算出來的近似值。使用 HyperLogLog 的原因有二
運算元對數據集影響估計
對於中間運算元,可以根據輸入數據集的統計信息以及運算元的特性,可以估算出輸出數據集的統計結果。
本節以 Filter 為例說明運算元對數據集的影響。
對於常見的 Column A < value B Filter,可通過如下方式估算輸出中間結果的統計信息
上述估算的前提是,欄位 A 數據均勻分布。但很多時候,數據分布並不均勻,且當數據傾斜嚴重是,上述估算誤差較大。此時,可充分利用 histogram 進行更精確的估算
啟用 Historgram 後,Filter Column A < value B 的估算方法為
在上圖中,B.value = 15,A.min = 0,A.max = 32,bin 個數為 10。Filter 後 A.ndv = ndv(<B.value) = ndv(<15)。該值可根據 A < 15 的 5 個 bin 的 ndv 通過 HyperLogLog 合並而得,無須重新計算所有 A < 15 的數據。
運算元代價估計
SQL 中常見的操作有 Selection(由 select 語句表示),Filter(由 where 語句表示)以及笛卡爾乘積(由 join 語句表示)。其中代價最高的是 join。
Spark SQL 的 CBO 通過如下方法估算 join 的代價
其中 rows 即記錄行數代表了 CPU 代價,size 代表了 IO 代價。weight 由 *spark.sql.cbo.joinReorder.card.weight *決定,其默認值為 0.7。
Build側選擇
對於兩表Hash Join,一般選擇小表作為build size,構建哈希表,另一邊作為 probe side。未開啟 CBO 時,根據表原始數據大小選擇 t2 作為build side
而開啟 CBO 後,基於估計的代價選擇 t1 作為 build side。更適合本例
優化 Join 類型
Spark SQL 中,Join 可分為 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代價相對較高。BroadcastJoin 無須 Join,但要求至少有一張表足夠小,能通過 Spark 的 Broadcast 機制廣播到每個 Executor 中。
在不開啟 CBO 中,Spark SQL 通過 spark.sql.autoBroadcastJoinThreshold 判斷是否啟用 BroadcastJoin。其默認值為 10485760 即 10 MB。
並且該判斷基於參與 Join 的表的原始大小。
在下圖示例中,Table 1 大小為 1 TB,Table 2 大小為 20 GB,因此在對二者進行 join 時,由於二者都遠大於自動 BroatcastJoin 的閾值,因此 Spark SQL 在未開啟 CBO 時選用 SortMergeJoin 對二者進行 Join。
而開啟 CBO 後,由於 Table 1 經過 Filter 1 後結果集大小為 500 GB,Table 2 經過 Filter 2 後結果集大小為 10 MB 低於自動 BroatcastJoin 閾值,因此 Spark SQL 選用 BroadcastJoin。
優化多表 Join 順序
未開啟 CBO 時,Spark SQL 按 SQL 中 join 順序進行 Join。極端情況下,整個 Join 可能是 left-deep tree。在下圖所示 TPC-DS Q25 中,多路 Join 存在如下問題,因此耗時 241 秒。
開啟 CBO 後, Spark SQL 將執行計劃優化如下
優化後的 Join 有如下優勢,因此執行時間降至 71 秒
總結
5萬人關注的大數據成神之路,不來了解一下嗎?
5萬人關注的大數據成神之路,真的不來了解一下嗎?
5萬人關注的大數據成神之路,確定真的不來了解一下嗎?
③ Spark Sql 函數使用
round - 保留數據精度
如 round(col("col1"),0) 對應數值為 21.23 -> 21.0 ;21.73 -> 22.0
如 round(col("col1"),1) 對應數值為 21.23 -> 21.2
如 round(col("col1"),-1) 對應數值為 21.23 -> 20.0
④ Spark SQL(十):Hive On Spark
Hive是目前大數據領域,事實上的SQL標准。其底層默認是基於MapRece實現的,但是由於MapRece速度實在比較慢,因此這幾年,陸續出來了新的SQL查詢引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。
Spark SQL與Hive On Spark是不一樣的。Spark SQL是Spark自己研發出來的針對各種數據源,包括Hive、JSON、Parquet、JDBC、RDD等都可以執行查詢的,一套基於Spark計算引擎的查詢引擎。因此它是Spark的一個項目,只不過提供了針對Hive執行查詢的工功能而已,適合在一些使用Spark技術棧的大數據應用類系統中使用。
而Hive On Spark,是Hive的一個項目,它是將Spark作為底層的查詢引擎(不通過MapRece作為唯一的查詢引擎)。Hive On Spark,只適用於Hive,在可預見的未來,很有可能Hive默認的底層引擎就從MapRece切換為Spark了;適合於將原有的Hive數據倉庫以及數據統計分析替換為Spark引擎,作為全公司通用的大數據統計分析引擎。
Hive On Spark做了一些優化:
1、Map Join
Spark SQL默認對join是支持使用broadcast機制將小表廣播到各個節點上,以進行join的。但是問題是,這會給Driver和Worker帶來很大的內存開銷。因為廣播的數據要一直保留在Driver內存中。所以目前採取的是,類似乎MapRece的Distributed Cache機制,即提高HDFS replica factor的復制因子,以讓數據在每個計算節點上都有一個備份,從而可以在本地進行數據讀取。
2、Cache Table
對於某些需要對一張表執行多次操作的場景,Hive On Spark內部做了優化,即將要多次操作的表cache到內存中,以便於提升性能。但是這里要注意,並不是對所有的情況都會自動進行cache。所以說,Hive On Spark還有很多不完善的地方。
Hive QL語句 =>
語法分析 => AST =>
生成邏輯執行計劃 => Operator Tree =>
優化邏輯執行計劃 => Optimized Operator Tree =>
生成物理執行計劃 => Task Tree =>
優化物理執行計劃 => Optimized Task Tree =>
執行優化後的Optimized Task Tree
⑤ Learning Spark [6] - Spark SQL高級函數
collect常用的局磨有兩個函數:collect_list(不去重)和collect_set(去重)
collect_list
collect_set
explode的定義是將桐緩斗數組的每個數據展開,如下我們就可以將上面的dataframe還原為最初的樣式。哪罩
posexplode可以在拆分列的同時,增加一列序號
但是如果表內有如下兩個一一對應的數組,我們該如何拆分呢?
按照直覺,我們嘗試分別explode()
解決這個問題,我們需要使用 LATERAL VIEW
lateral view可以理解為創建了一個表,然後JOIN到了查詢的表上,這樣就避免了兩個生成器的問題
split則是將一個字元串根據分隔符,變化為一個數組
transform會引用一個函數在數組的每個元素上,返回一個數列
filter為通過條件刪選,返回一個數列
exists為判斷是否包含該元素,返回一個布爾值
rece為通過兩個函數,將數組聚合為一個值,然後對該值進行運算
Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee
⑥ spark sql啟動後執行越來越慢是為什麼
Shark為了實現Hive兼容,在HQL方面森悶重用了Hive中HQL的解析、邏輯執行計劃翻譯、執行計劃優化等邏輯,可以近似認為僅將物理執行計劃從MR作業替換成了Spark作業(輔以內存列式存儲等各種和Hive關系不大的優化);同時還依賴Hive Metastore和Hive SerDe(用於兼容現有的各種Hive存儲格式)。這一策略導致了兩個問題,第一是執行計劃優化完全依賴於Hive,不方便昌簡添加新的優化策略;二是因為MR是進程級並行,寫代碼的時候不是很注意線程安全問題,導致Shark不得不使用另外一套獨立維耐春褲護的打了補丁的Hive源碼分支(至於為何相關修改沒有合並到Hive主線,我也不太清楚)。
此外,除了兼容HQL、加速現有Hive數據的查詢分析以外,Spark SQL還支持直接對原生RDD對象進行關系查詢。同時,除了HQL以外,Spark SQL還內建了一個精簡的SQL parser,以及一套Scala DSL。也就是說,如果只是使用Spark SQL內建的SQL方言或Scala DSL對原生RDD對象進行關系查詢,用戶在開發Spark應用時完全不需要依賴Hive的任何東西。
⑦ 大數據如何入門
首先我們要了解Java語言和Linux操作系統,這兩個是學習大數據的基礎,學習的順序不分前後。
大數據
Java :只要了解一些基礎即可,做大數據不需要很深的Java 技術,學java SE 就相當於有學習大數據基礎。
Linux:因為大數據相關軟體都是在Linux上運行的,所以Linux要學習的扎實一些,學好Linux對你快速掌握大數據相關技術會有很大的幫助,能讓你更好的理解hadoop、hive、hbase、spark等大數據軟體的運行環境和網路環境配置,能少踩很多坑,學會shell就能看懂腳本這樣能更容易理解和配置大數據集群。還能讓你對以後新出的大數據技術學習起來更快。
Hadoop:這是現在流行的大數據處理平台幾乎已經成為大數據的代名詞,所以這個是必學的。Hadoop裡麵包括幾個組件HDFS、MapRece和YARN,HDFS是存儲數據的地方就像我們電腦的硬碟一樣文件都存儲在這個上面,MapRece是對數據進行處理計算的,它有個特點就是不管多大的數據只要給它時間它就能把數據跑完,但是時間可能不是很快所以它叫數據的批處理。
Zookeeper:這是個萬金油,安裝Hadoop的HA的時候就會用到它,以後的Hbase也會用到它。它一般用來存放一些相互協作的信息,這些信息比較小一般不會超過1M,都是使用它的軟體對它有依賴,對於我們個人來講只需要把它安裝正確,讓它正常的run起來就可以了。
Mysql:我們學習完大數據的處理了,接下來學習學習小數據的處理工具mysql資料庫,因為一會裝hive的時候要用到,mysql需要掌握到什麼層度那?你能在Linux上把它安裝好,運行起來,會配置簡單的許可權,修改root的密碼,創建資料庫。這里主要的是學習SQL的語法,因為hive的語法和這個非常相似。
Sqoop:這個是用於把Mysql里的數據導入到Hadoop里的。當然你也可以不用這個,直接把Mysql數據表導出成文件再放到HDFS上也是一樣的,當然生產環境中使用要注意Mysql的壓力。
Hive:這個東西對於會SQL語法的來說就是神器,它能讓你處理大數據變的很簡單,不會再費勁的編寫MapRece程序。有的人說Pig那?它和Pig差不多掌握一個就可以了。
Oozie:既然學會Hive了,我相信你一定需要這個東西,它可以幫你管理你的Hive或者MapRece、Spark腳本,還能檢查你的程序是否執行正確,出錯了給你發報警並能幫你重試程序,最重要的是還能幫你配置任務的依賴關系。我相信你一定會喜歡上它的,不然你看著那一大堆腳本,和密密麻麻的crond是不是有種想屎的感覺。
Hbase:這是Hadoop生態體系中的NOSQL資料庫,他的數據是按照key和value的形式存儲的並且key是唯一的,所以它能用來做數據的排重,它與MYSQL相比能存儲的數據量大很多。所以他常被用於大數據處理完成之後的存儲目的地。
Kafka:這是個比較好用的隊列工具,隊列是干嗎的?排隊買票你知道不?數據多了同樣也需要排隊處理,這樣與你協作的其它同學不會叫起來,你干嗎給我這么多的數據(比如好幾百G的文件)我怎麼處理得過來,你別怪他因為他不是搞大數據的,你可以跟他講我把數據放在隊列里你使用的時候一個個拿,這樣他就不在抱怨了馬上灰流流的去優化他的程序去了,因為處理不過來就是他的事情。而不是你給的問題。當然我們也可以利用這個工具來做線上實時數據的入庫或入HDFS,這時你可以與一個叫Flume的工具配合使用,它是專門用來提供對數據進行簡單處理,並寫到各種數據接受方(比如Kafka)的。
Spark:它是用來彌補基於MapRece處理數據速度上的缺點,它的特點是把數據裝載到內存中計算而不是去讀慢的要死進化還特別慢的硬碟。特別適合做迭代運算,所以演算法流們特別稀飯它。它是用scala編寫的。Java語言或者Scala都可以操作它,因為它們都是用JVM的。
⑧ 如何避免Spark SQL做數據導入時產生大量小文件
生產上,我們往往將Spark SQL作為Hive的替代方案,來獲得SQL on Hadoop更出色的性能。因此,本文所講的是指存儲於HDFS中小文件,即指文件的大小遠小於HDFS上塊(dfs.block.size)大小的文件。
比如我們拿TPCDS測試集中的store_sales進行舉例, sql如下所示
首先我們得到其執行計劃,如下所示,
store_sales的原生文件包含1616邏輯分片,對應生成1616 個Spark Task,插入動態分區表之後生成1824個數據分區加一個NULL值的分區,每個分區下都有可能生成1616個文件,這種情況下,最終的文件數量極有可能達到2949200。1T的測試集store_sales也就大概300g,這種情況每個文件可能就零點幾M。
比如,為了防止Shuffle階段的數據傾斜我們可以在上面的sql中加上 distribute by rand() ,這樣我們的執行計劃就變成了,
這種情況下,這樣我們的文件數妥妥的就是spark.sql.shuffle.partitions * N,因為rand函數一般會把數據打散的非常均勻。當spark.sql.shuffle.partitions設置過大時,小文件問題就產生了;當spark.sql.shuffle.partitions設置過小時,任務的並行度就下降了,性能隨之受到影響。
最理想的情況,當然是根據分區欄位進行shuffle,在上面的sql中加上 distribute by ss_sold_date_sk 。 把同一分區的記錄都哈希到同一個分區中去,由一個Spark的Task進行寫棗租察入,這樣的話只會產生N個文件,在我們的case中store_sales,在1825個分區下各型簡種生成了一個數據文件。
但是這種情況下也容易出現數據傾斜的問題,比如雙11的銷售數據就很容易在這種情況下發生傾斜。
前面已經提到根據分區欄位進行分區,除非每個分區下本身的數據較少,分區欄位選擇不合理,那麼小文件問題基本上就不存在了,但是也有可能由於shuffle引入新的數據傾斜問題。
我們首先可以嘗試是否可以將兩者結合使用, 在之前的sql上加上 distribute by ss_sold_date_sk,cast(rand() * 5 as int) , 這個類似於我們處理數據傾斜問題時候給欄位加上後綴的形式。如,
按照之前的推算,每個分區下將產生5個文件,同時null值傾斜部分的數據也被打散成五份進行計算,緩解了數據傾斜的問題 ,我們最終將得到1825 *5=9105個文件,如下所示
如果我們將5改得更小,文件數也會越少,但相應的傾斜key的計算時間也會上去。
在我們知道那個分區鍵傾斜的情況下,我們也可以將入庫的SQL拆成幾個部分,比如我們store_sales是因為null值傾斜,我們就可以通過 where ss_sold_date_sk is not null 和 where ss_sold_date_sk is null 將原始數據分成兩個部分。前者可以基於分區欄位進行分區,如 distribute by ss_sold_date_sk ;後者可以基於隨機值進行分區, distribute by cast(rand() * 5 as int) , 這樣可以靜態的將null值部分分成五個文件。
對於傾斜部分的數據,我們可以開啟Spark SQL的自適應功能, spark.sql.adaptive.enabled=true 來動態調整每個相當於Spark的rece端task處理的數據量,這樣我們就不需要人為的感知隨機值的規模了,我們可以直接
然後Spark在Shuffle 階段會自動的幫我們將數據盡量的凳茄合並成 spark.sql.adaptive.shuffle.targetPostShuffleInputSize (默認64m)的大小,以減少輸出端寫文件線程的總量,最後減少個數。
對於 spark.sql.adaptive.shuffle.targetPostShuffleInputSize 參數而言,我們也可以設置成為 dfs.block.size 的大小,這樣可以做到和塊對齊,文件大小可以設置的最為合理。
在我們的 猛獁大數據平台 上面,隨便的建立幾個SQL作業,不用會Spark也可以用SQL把大數據玩得666!
雙擊每個工作節點,我們也可以對我們的SQL作業進行參數的調整
選中我們對應的實驗組,點擊執行後,可以查看任務的運行狀態。
從各組的實驗結果來看
實驗組一的小文件控制還是可喜可賀的。對於我們1t的tpcds測試數據,null值分區欄位下只有40個文件,其他每個數據分區也只有一個數據文件,總目錄1825,總文件數1863. 在解決數據傾斜問題的基礎上,也只比純按照分區欄位進行distibute by多了39個文件。
本文講述的是如何在純寫SQL的場景下,如何用Spark SQL做數據導入時候,控制小文件的數量。
⑨ 如何使用 Spark SQL
一、啟動方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
註:/data/spark-1.4.0-bin-cdh4/為spark的安裝路徑
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看啟動選項
--master MASTER_URL 指定master url
--executor-memory MEM 每個executor的內存,默認為1G
--total-executor-cores NUM 所有executor的總核數
-e <quoted-query-string> 直接執行查詢SQL
-f <filename> 以文件方式批量執行SQL
二、Spark sql對hive支持的功能
1、查詢語句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作運算:
1) 關系運算:= ==, <>, <, >, >=, <=
2) 算術運算:+, -, *, /, %
3) 邏輯運算:AND, &&, OR, ||
4) 復雜的數據結構
5) 數學函數:(sign, ln, cos, etc)
6) 字元串函數:
3、 UDF
4、 UDAF
5、 用戶定義的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查詢: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分區表
12、 視圖
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的數據類型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客戶端編程方式進行查詢數據
1、啟動spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、編寫程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有數據:df.show()
查看錶結構:df.printSchema()
只看name列:df.select("name").show()
對數據運算:df.select(df("name"), df("age") + 1).show()
過濾數據:df.filter(df("age") > 21).show()
分組統計:df.groupBy("age").count().show()
1、查詢txt數據
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查詢結果數據
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet「)
四、Spark sql性能調優
緩存數據表:sqlContext.cacheTable("tableName")
取消緩存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue當設置為true時,Spark SQL將為基於數據統計信息的每列自動選擇一個壓縮演算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱狀緩存的批數據大小。更大的批數據可以提高內存的利用率以及壓縮效率,但有OOMs的風險