『壹』 Ku:Spark sql操作Ku
摘要: Spark SQL , Ku
參考 https://github.com/xieenze/SparkOnKu/blob/master/src/main/scala/com/spark/test/KuCRUD.scala
引入 spark-core_2.11 , spark-sql_2.11 , ku-spark2_2.11 , hadoop-client 依賴包
指定 ku.master" , ku.table ,如果讀取超時加入 ku.operation.timeout.ms 參數
或者
寫入數據可以使用dataframe的 write 方法,也可以使用 kuContext 的 updateRows , insertRows , upsertRows , insertIgnoreRows 方法
直接調用dataframe的write方法指定 ku.master , ku.table ,只支持 append 模式,對已有key的數據自動更新
調用kuContext的 upsertRows 方法,效果和dataframe調用write append模式一樣
調用kuContext insertRows , insertIgnoreRows 方法,如果插入的數據key已存在insertRows直接報錯,insertIgnoreRows忽略已存在的key,只插入不存在的key
調用kuContext updateRows 方法,對已經存在的key數據做更新,如果key不存在直接報錯
使用已有dataframe的schema建表
使用 StructType 自定義schema
刪除表和判斷表是否存在
『貳』 如何使用 Spark SQL
一、啟動方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
註:/data/spark-1.4.0-bin-cdh4/為spark的安裝路徑
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看啟動選項
--master MASTER_URL 指定master url
--executor-memory MEM 每個executor的內存,默認為1G
--total-executor-cores NUM 所有executor的總核數
-e <quoted-query-string> 直接執行查詢SQL
-f <filename> 以文件方式批量執行SQL
二、Spark sql對hive支持的功能
1、查詢語句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作運算:
1) 關系運算:= ==, <>, <, >, >=, <=
2) 算術運算:+, -, *, /, %
3) 邏輯運算:AND, &&, OR, ||
4) 復雜的數據結構
5) 數學函數:(sign, ln, cos, etc)
6) 字元串函數:
3、 UDF
4、 UDAF
5、 用戶定義的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查詢: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分區表
12、 視圖
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的數據類型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客戶端編程方式進行查詢數據
1、啟動spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、編寫程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有數據:df.show()
查看錶結構:df.printSchema()
只看name列:df.select("name").show()
對數據運算:df.select(df("name"), df("age") + 1).show()
過濾數據:df.filter(df("age") > 21).show()
分組統計:df.groupBy("age").count().show()
1、查詢txt數據
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查詢結果數據
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet「)
四、Spark sql性能調優
緩存數據表:sqlContext.cacheTable("tableName")
取消緩存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue當設置為true時,Spark SQL將為基於數據統計信息的每列自動選擇一個壓縮演算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱狀緩存的批數據大小。更大的批數據可以提高內存的利用率以及壓縮效率,但有OOMs的風險
『叄』 Spark SQL CBO 基於代價的優化
Spark CBO 背景
本文將介紹 CBO,它充分考慮了數據本身的特點(如大小、分布)以及操作運算元的特點(中間結果集的分布及大小)及代價,從而更好的選擇執行代價最小的物理執行計劃,即 SparkPlan。
Spark CBO 原理
CBO 原理是計算所有可能的物理計劃的代價,並挑選出代價最小的物理執行計劃。其核心在於評估一個給定的物理執行計劃的代價。
物理執行計劃是一個樹狀結構,其代價等於每個執行節點的代價總合,如下圖所示。
而每個執行節點的代價,分為兩個部分
每個操作運算元的代價相對固定,可用規則來描述。而執行節點輸出數據集的大小與分布,分為兩個部分:1) 初始數據集,也即原始表,其數據集的大小與分布可直接通過統計得到;2)中間節點輸出數據集的大小與分布可由其輸入數據集的信息與操作本身的特點推算。
所以,最終主要需要解決兩個問題
Statistics 收集
通過如下 SQL 語句,可計算出整個表的記錄總數以及總大小
從如下示例中,Statistics 一行可見, customer 表數據總大小為 37026233 位元組,即 35.3MB,總記錄數為 28萬,與事實相符。
通過如下 SQL 語句,可計算出指定列的統計信息
從如下示例可見,customer 表的 c_customer_sk 列最小值為 1, 最大值為 280000,null 值個數為 0,不同值個數為 274368,平均列長度為 8,最大列長度為 8。
除上述示例中的統計信息外,Spark CBO 還直接等高直方圖。在上例中,histogram 為 NULL。其原因是,spark.sql.statistics.histogram.enabled 默認值為 false,也即 ANALYZE 時默認不計算及存儲 histogram。
下例中,通過 SET spark.sql.statistics.histogram.enabled=true; 啟用 histogram 後,完整的統計信息如下。
從上圖可見,生成的 histogram 為 equal-height histogram,且高度為 1102.36,bin 數為 254。其中 bin 個數可由 spark.sql.statistics.histogram.numBins 配置。對於每個 bin,勻記錄其最小值,最大值,以及 distinct count。
值得注意的是,這里的 distinct count 並不是精確值,而是通過 HyperLogLog 計算出來的近似值。使用 HyperLogLog 的原因有二
運算元對數據集影響估計
對於中間運算元,可以根據輸入數據集的統計信息以及運算元的特性,可以估算出輸出數據集的統計結果。
本節以 Filter 為例說明運算元對數據集的影響。
對於常見的 Column A < value B Filter,可通過如下方式估算輸出中間結果的統計信息
上述估算的前提是,欄位 A 數據均勻分布。但很多時候,數據分布並不均勻,且當數據傾斜嚴重是,上述估算誤差較大。此時,可充分利用 histogram 進行更精確的估算
啟用 Historgram 後,Filter Column A < value B 的估算方法為
在上圖中,B.value = 15,A.min = 0,A.max = 32,bin 個數為 10。Filter 後 A.ndv = ndv(<B.value) = ndv(<15)。該值可根據 A < 15 的 5 個 bin 的 ndv 通過 HyperLogLog 合並而得,無須重新計算所有 A < 15 的數據。
運算元代價估計
SQL 中常見的操作有 Selection(由 select 語句表示),Filter(由 where 語句表示)以及笛卡爾乘積(由 join 語句表示)。其中代價最高的是 join。
Spark SQL 的 CBO 通過如下方法估算 join 的代價
其中 rows 即記錄行數代表了 CPU 代價,size 代表了 IO 代價。weight 由 *spark.sql.cbo.joinReorder.card.weight *決定,其默認值為 0.7。
Build側選擇
對於兩表Hash Join,一般選擇小表作為build size,構建哈希表,另一邊作為 probe side。未開啟 CBO 時,根據表原始數據大小選擇 t2 作為build side
而開啟 CBO 後,基於估計的代價選擇 t1 作為 build side。更適合本例
優化 Join 類型
Spark SQL 中,Join 可分為 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代價相對較高。BroadcastJoin 無須 Join,但要求至少有一張表足夠小,能通過 Spark 的 Broadcast 機制廣播到每個 Executor 中。
在不開啟 CBO 中,Spark SQL 通過 spark.sql.autoBroadcastJoinThreshold 判斷是否啟用 BroadcastJoin。其默認值為 10485760 即 10 MB。
並且該判斷基於參與 Join 的表的原始大小。
在下圖示例中,Table 1 大小為 1 TB,Table 2 大小為 20 GB,因此在對二者進行 join 時,由於二者都遠大於自動 BroatcastJoin 的閾值,因此 Spark SQL 在未開啟 CBO 時選用 SortMergeJoin 對二者進行 Join。
而開啟 CBO 後,由於 Table 1 經過 Filter 1 後結果集大小為 500 GB,Table 2 經過 Filter 2 後結果集大小為 10 MB 低於自動 BroatcastJoin 閾值,因此 Spark SQL 選用 BroadcastJoin。
優化多表 Join 順序
未開啟 CBO 時,Spark SQL 按 SQL 中 join 順序進行 Join。極端情況下,整個 Join 可能是 left-deep tree。在下圖所示 TPC-DS Q25 中,多路 Join 存在如下問題,因此耗時 241 秒。
開啟 CBO 後, Spark SQL 將執行計劃優化如下
優化後的 Join 有如下優勢,因此執行時間降至 71 秒
總結
5萬人關注的大數據成神之路,不來了解一下嗎?
5萬人關注的大數據成神之路,真的不來了解一下嗎?
5萬人關注的大數據成神之路,確定真的不來了解一下嗎?
『肆』 2019-03-05 SparkSQL集群性能調優 CheatSheet
0.買高性能機器,增加節點
1.設置磁碟文件預讀值大小為16384,使用linux命令:
echo 16384 > /sys/block/{磁碟名}/queue/read_ahead_kb
2. Spark 任務序列化只支持JavaSerializer,數據序列化支持JavaSerializer和 KryoSerializer 。KryoSerializer能達到JavaSerializer的十倍。
3.在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置項中添加參數:" -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ",如果頻繁出現Full GC,需要優化GC。把RDD做Cache操作,通過日誌查看RDD在內存中的大小,如果數據太大,需要改變RDD的存儲級別來優化。
4.一般並行度設置為集群CPU總和的2-3倍
5.大表和小表做join操作時可以把小表Broadcast到各個節點,從而就可以把join操作轉變成普通的操作,減少了shuffle操作。
6. 合理設計DAG,減少shuffle //TODO
7.使用 mapPartitions 可以更靈活地操作數據,例如對一個很大的數據求TopN,當N不是很大時,可以先使用mapPartitions對每個partition求TopN,collect結果到本地之後再做排序取TopN。這樣相比直接對全量數據做排序取TopN效率要高很多。
8.當之前的操作有很多filter時,使用 coalesce 減少空運行的任務數量
9.當任務數過大時候Shuffle壓力太大導致程序掛住不動,或者出現linux資源受限的問題。此時需要對數據重新進行分區,使用 repartition 。
10.配置多個磁碟給 localDir ,shuffle時寫入數據速度增快
11. 別collect大數據量,數據會回到driver端,容易OOM。非要collect,請配置 spark.sql.bigdata.thriftServer.useHdfsCollect 為true,會存在hdfs再讀
12.盡量用receByKey,會在Map端做本地聚合
13. broadcase set/map而不是Iterator, set/map 查詢效率O(1) ,iteratorO(n)
14. 數據發生傾斜,repartition大法 ,查出key,salt it
15.使用Hash Shuffle時,通過設置 spark.shuffle.consolidateFiles 為true,來合並shuffle中間文件,減少shuffle文件的數量,減少文件IO操作以提升性能
16.Spark SQL 小表join,把小表broadcast出去。配置 spark.sql.autoBroadcastJoinThreshold 和 spark.sql.bigdata.useExecutorBroadcast 。小表在join 右端。
17.SparkSQL數據傾斜,配置 spark.sql.planner.skewJoin 和 spark.sql.planner.skewJoin.threshold
18. SparkSQL 小文件,配置 spark.sql.small.file.combine 和 spark.sql.small.file.split.size