當前位置:首頁 » 編程語言 » sparksql實例
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

sparksql實例

發布時間: 2023-02-07 05:01:33

㈠ Ku:Spark sql操作Ku

摘要: Spark SQL , Ku

參考 https://github.com/xieenze/SparkOnKu/blob/master/src/main/scala/com/spark/test/KuCRUD.scala

引入 spark-core_2.11 , spark-sql_2.11 , ku-spark2_2.11 , hadoop-client 依賴包

指定 ku.master" , ku.table ,如果讀取超時加入 ku.operation.timeout.ms 參數

或者

寫入數據可以使用dataframe的 write 方法,也可以使用 kuContext 的 updateRows , insertRows , upsertRows , insertIgnoreRows 方法

直接調用dataframe的write方法指定 ku.master , ku.table ,只支持 append 模式,對已有key的數據自動更新

調用kuContext的 upsertRows 方法,效果和dataframe調用write append模式一樣

調用kuContext insertRows , insertIgnoreRows 方法,如果插入的數據key已存在insertRows直接報錯,insertIgnoreRows忽略已存在的key,只插入不存在的key

調用kuContext updateRows 方法,對已經存在的key數據做更新,如果key不存在直接報錯

使用已有dataframe的schema建表

使用 StructType 自定義schema

刪除表和判斷表是否存在

㈡ Spark SQL(十):Hive On Spark

Hive是目前大數據領域,事實上的SQL標准。其底層默認是基於MapRece實現的,但是由於MapRece速度實在比較慢,因此這幾年,陸續出來了新的SQL查詢引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。

Spark SQL與Hive On Spark是不一樣的。Spark SQL是Spark自己研發出來的針對各種數據源,包括Hive、JSON、Parquet、JDBC、RDD等都可以執行查詢的,一套基於Spark計算引擎的查詢引擎。因此它是Spark的一個項目,只不過提供了針對Hive執行查詢的工功能而已,適合在一些使用Spark技術棧的大數據應用類系統中使用。

而Hive On Spark,是Hive的一個項目,它是將Spark作為底層的查詢引擎(不通過MapRece作為唯一的查詢引擎)。Hive On Spark,只適用於Hive,在可預見的未來,很有可能Hive默認的底層引擎就從MapRece切換為Spark了;適合於將原有的Hive數據倉庫以及數據統計分析替換為Spark引擎,作為全公司通用的大數據統計分析引擎。

Hive On Spark做了一些優化:
1、Map Join
Spark SQL默認對join是支持使用broadcast機制將小表廣播到各個節點上,以進行join的。但是問題是,這會給Driver和Worker帶來很大的內存開銷。因為廣播的數據要一直保留在Driver內存中。所以目前採取的是,類似乎MapRece的Distributed Cache機制,即提高HDFS replica factor的復制因子,以讓數據在每個計算節點上都有一個備份,從而可以在本地進行數據讀取。

2、Cache Table
對於某些需要對一張表執行多次操作的場景,Hive On Spark內部做了優化,即將要多次操作的表cache到內存中,以便於提升性能。但是這里要注意,並不是對所有的情況都會自動進行cache。所以說,Hive On Spark還有很多不完善的地方。

Hive QL語句 =>
語法分析 => AST =>
生成邏輯執行計劃 => Operator Tree =>
優化邏輯執行計劃 => Optimized Operator Tree =>
生成物理執行計劃 => Task Tree =>
優化物理執行計劃 => Optimized Task Tree =>
執行優化後的Optimized Task Tree

㈢ 如何使用 Spark SQL

一、啟動方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2

註:/data/spark-1.4.0-bin-cdh4/為spark的安裝路徑

/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看啟動選項

--master MASTER_URL 指定master url
--executor-memory MEM 每個executor的內存,默認為1G
--total-executor-cores NUM 所有executor的總核數
-e <quoted-query-string> 直接執行查詢SQL

-f <filename> 以文件方式批量執行SQL

二、Spark sql對hive支持的功能

1、查詢語句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作運算:
1) 關系運算:= ==, <>, <, >, >=, <=
2) 算術運算:+, -, *, /, %
3) 邏輯運算:AND, &&, OR, ||
4) 復雜的數據結構
5) 數學函數:(sign, ln, cos, etc)
6) 字元串函數:
3、 UDF
4、 UDAF

5、 用戶定義的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查詢: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分區表
12、 視圖
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE

14、 支持的數據類型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT

三、Spark sql 在客戶端編程方式進行查詢數據
1、啟動spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、編寫程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有數據:df.show()
查看錶結構:df.printSchema()
只看name列:df.select("name").show()
對數據運算:df.select(df("name"), df("age") + 1).show()
過濾數據:df.filter(df("age") > 21).show()

分組統計:df.groupBy("age").count().show()

1、查詢txt數據
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件

val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查詢結果數據
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet「)

四、Spark sql性能調優

緩存數據表:sqlContext.cacheTable("tableName")

取消緩存表:sqlContext.uncacheTable("tableName")

spark.sql.inMemoryColumnarStorage.compressedtrue當設置為true時,Spark SQL將為基於數據統計信息的每列自動選擇一個壓縮演算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱狀緩存的批數據大小。更大的批數據可以提高內存的利用率以及壓縮效率,但有OOMs的風險

㈣ Spark Sql 函數使用

round -  保留數據精度 

如 round(col("col1"),0) 對應數值為 21.23 -> 21.0 ;21.73 -> 22.0

如 round(col("col1"),1) 對應數值為 21.23 -> 21.2

如 round(col("col1"),-1) 對應數值為 21.23 -> 20.0

㈤ AWS Glue中使用Spark SQL

AWS Glue 是一項完全託管的提取、轉換和載入 (ETL) 服務,讓客戶能夠輕松准備和載入數據進行分析。您只需在 AWS 管理控制台中單擊幾次,即可創建並運行 ETL 作業。您只需將 AWS Glue 指向存儲在 AWS 上的數據,AWS Glue 便會發現您的數據,並將關聯的元數據(例如表定義和架構)存儲到 AWS Glue 數據目錄中。存入目錄後,您的數據可立即供 ETL 搜索、查詢和使用。

Glue提供了DynamicFrame來操作數據,但如果用戶習慣用Spark SQL來做ETL,那是否可行呢? 本文就做了一個嘗試:

首先我們創建一個基本的Glue Job,選擇Spark,這里要注意在Job parameters裡面加上

--enable-glue-datacatalog = true

這是為了在Spark SQL中使用Glue的元數據。

之後其他步驟都隨意選擇,進入腳本編輯環境,將腳本替換成如下:

這里做了一個簡單的insert overwrite操作,從表testdata1中選擇數據到表table_6。

嘗試運行Job,等待7-8分鍾後就可以看到任務完成了。此時去檢查table_6的數據,已經有了。

㈥ spark sql 2.3 源碼解讀 - Execute (7)

終於到了最後一步執行了:

最關鍵的兩個函數便是 doPrepare和 doExecute了。

還是以上一章的sql語句為例,其最終生成的sparkplan為:

看一下SortExec的doPrepare 和 doExecute方法:

下面看child也就是ShuffleExchangeExec:

先看沒有exchangeCoordinator的情況,

首先執行:

上面的方法會返回一個ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它決定了每一條InternalRow shuffle後的partition id:

接下來:

返回結果是ShuffledRowRDD:

CoalescedPartitioner的邏輯:

再看有exchangeCoordinator的情況:

同樣返回的是ShuffledRowRDD:

再看doEstimationIfNecessary:

estimatePartitionStartIndices 函數得到了 partitionStartIndices:

有exchangeCoordinator的情況就生成了partitionStartIndices,從而對分區進行了調整。

最後來一個例子:

未開啟exchangeCoordinator的plan:

開啟exchangeCoordinator的plan:

不同之處是 兩個Exchange都帶了coordinator,且都是同一個coordinator。

執行withExchangeCoordinator前:

執行withExchangeCoordinator後:

生成了coordinator,且執行了 doPrepare後,可以看到兩個exchange都向其注冊了。

doExecute後:

原先的numPartitions是200,經過執行後,生成的partitionStartIndices為[1],也就是只有1個partition,顯然在測試數據量很小的情況下,1個partition是更為合理的。這就是ExchangeCoordinator的功勞。

execute 最終的輸出是rdd,剩下的結果便是spark對rdd的運算了。其實 spark sql 最終的目標便也是生成rdd,交給spark core來運算。

spark sql的介紹到這里就結束了。

㈦ spark從hive數據倉庫中讀取的數據可以使用sparksql進行查詢嗎

1、為了讓Spark能夠連接到Hive的原有數據倉庫,我們需要將Hive中的hive-site.xml文件拷貝到Spark的conf目錄下,這樣就可以通過這個配置文件找到Hive的元數據以及數據存放。
在這里由於我的Spark是自動安裝和部署的,因此需要知道CDH將hive-site.xml放在哪裡。經過摸索。該文件默認所在的路徑是:/etc/hive/conf 下。
同理,spark的conf也是在/etc/spark/conf。
此時,如上所述,將對應的hive-site.xml拷貝到spark/conf目錄下即可
如果Hive的元數據存放在Mysql中,我們還需要准備好Mysql相關驅動,比如:mysql-connector-java-5.1.22-bin.jar。
2、編寫測試代碼
val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local")
val sc=new SparkContext(conf)

//create hivecontext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ") //這里需要注意數據的間隔符

sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/kv1.txt' INTO TABLE src ");

sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println)

sc.stop()

3、下面列舉一下出現的問題:
(1)如果沒有將hive-site.xml拷貝到spark/conf目錄下,會出現:

分析:從錯誤提示上面就知道,spark無法知道hive的元數據的位置,所以就無法實例化對應的client。
解決的辦法就是必須將hive-site.xml拷貝到spark/conf目錄下
(2)測試代碼中沒有加sc.stop會出現如下錯誤:
ERROR scheler.LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
在代碼最後一行添加sc.stop()解決了該問題。

㈧ Spark SQL CBO 基於代價的優化

Spark CBO 背景

本文將介紹 CBO,它充分考慮了數據本身的特點(如大小、分布)以及操作運算元的特點(中間結果集的分布及大小)及代價,從而更好的選擇執行代價最小的物理執行計劃,即 SparkPlan。

Spark CBO 原理

CBO 原理是計算所有可能的物理計劃的代價,並挑選出代價最小的物理執行計劃。其核心在於評估一個給定的物理執行計劃的代價。

物理執行計劃是一個樹狀結構,其代價等於每個執行節點的代價總合,如下圖所示。

而每個執行節點的代價,分為兩個部分

每個操作運算元的代價相對固定,可用規則來描述。而執行節點輸出數據集的大小與分布,分為兩個部分:1) 初始數據集,也即原始表,其數據集的大小與分布可直接通過統計得到;2)中間節點輸出數據集的大小與分布可由其輸入數據集的信息與操作本身的特點推算。

所以,最終主要需要解決兩個問題

Statistics 收集

通過如下 SQL 語句,可計算出整個表的記錄總數以及總大小

從如下示例中,Statistics 一行可見, customer 表數據總大小為 37026233 位元組,即 35.3MB,總記錄數為 28萬,與事實相符。

通過如下 SQL 語句,可計算出指定列的統計信息

從如下示例可見,customer 表的 c_customer_sk 列最小值為 1, 最大值為 280000,null 值個數為 0,不同值個數為 274368,平均列長度為 8,最大列長度為 8。

除上述示例中的統計信息外,Spark CBO 還直接等高直方圖。在上例中,histogram 為 NULL。其原因是,spark.sql.statistics.histogram.enabled 默認值為 false,也即 ANALYZE 時默認不計算及存儲 histogram。

下例中,通過 SET spark.sql.statistics.histogram.enabled=true; 啟用 histogram 後,完整的統計信息如下。

從上圖可見,生成的 histogram 為 equal-height histogram,且高度為 1102.36,bin 數為 254。其中 bin 個數可由 spark.sql.statistics.histogram.numBins 配置。對於每個 bin,勻記錄其最小值,最大值,以及 distinct count。

值得注意的是,這里的 distinct count 並不是精確值,而是通過 HyperLogLog 計算出來的近似值。使用 HyperLogLog 的原因有二

運算元對數據集影響估計

對於中間運算元,可以根據輸入數據集的統計信息以及運算元的特性,可以估算出輸出數據集的統計結果。

本節以 Filter 為例說明運算元對數據集的影響。

對於常見的 Column A < value B Filter,可通過如下方式估算輸出中間結果的統計信息

上述估算的前提是,欄位 A 數據均勻分布。但很多時候,數據分布並不均勻,且當數據傾斜嚴重是,上述估算誤差較大。此時,可充分利用 histogram 進行更精確的估算

啟用 Historgram 後,Filter Column A < value B 的估算方法為

在上圖中,B.value = 15,A.min = 0,A.max = 32,bin 個數為 10。Filter 後 A.ndv = ndv(<B.value) = ndv(<15)。該值可根據 A < 15 的 5 個 bin 的 ndv 通過 HyperLogLog 合並而得,無須重新計算所有 A < 15 的數據。

運算元代價估計

SQL 中常見的操作有 Selection(由 select 語句表示),Filter(由 where 語句表示)以及笛卡爾乘積(由 join 語句表示)。其中代價最高的是 join。

Spark SQL 的 CBO 通過如下方法估算 join 的代價

其中 rows 即記錄行數代表了 CPU 代價,size 代表了 IO 代價。weight 由 *spark.sql.cbo.joinReorder.card.weight *決定,其默認值為 0.7。

Build側選擇

對於兩表Hash Join,一般選擇小表作為build size,構建哈希表,另一邊作為 probe side。未開啟 CBO 時,根據表原始數據大小選擇 t2 作為build side

而開啟 CBO 後,基於估計的代價選擇 t1 作為 build side。更適合本例

優化 Join 類型

Spark SQL 中,Join 可分為 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代價相對較高。BroadcastJoin 無須 Join,但要求至少有一張表足夠小,能通過 Spark 的 Broadcast 機制廣播到每個 Executor 中。

在不開啟 CBO 中,Spark SQL 通過 spark.sql.autoBroadcastJoinThreshold 判斷是否啟用 BroadcastJoin。其默認值為 10485760 即 10 MB。

並且該判斷基於參與 Join 的表的原始大小。

在下圖示例中,Table 1 大小為 1 TB,Table 2 大小為 20 GB,因此在對二者進行 join 時,由於二者都遠大於自動 BroatcastJoin 的閾值,因此 Spark SQL 在未開啟 CBO 時選用 SortMergeJoin 對二者進行 Join。

而開啟 CBO 後,由於 Table 1 經過 Filter 1 後結果集大小為 500 GB,Table 2 經過 Filter 2 後結果集大小為 10 MB 低於自動 BroatcastJoin 閾值,因此 Spark SQL 選用 BroadcastJoin。

優化多表 Join 順序

未開啟 CBO 時,Spark SQL 按 SQL 中 join 順序進行 Join。極端情況下,整個 Join 可能是 left-deep tree。在下圖所示 TPC-DS Q25 中,多路 Join 存在如下問題,因此耗時 241 秒。

開啟 CBO 後, Spark SQL 將執行計劃優化如下

優化後的 Join 有如下優勢,因此執行時間降至 71 秒

總結

5萬人關注的大數據成神之路,不來了解一下嗎?

5萬人關注的大數據成神之路,真的不來了解一下嗎?

5萬人關注的大數據成神之路,確定真的不來了解一下嗎?

㈨ 可能是全網最詳細的 Spark Sql Aggregate 源碼剖析

縱觀 Spark Sql 源碼,聚合的實現是其中較為復雜的部分,本文希望能以例子結合流程圖的方式來說清楚整個過程。這里僅關注 Aggregate 在物理執行計劃相關的內容,之前的 parse、analyze 及 optimize 階段暫不做分析。在 Spark Sql 中,有一個專門的 Aggregation strategy 用來處理聚合,我們先來看看這個策略。

本文暫不討論 distinct Aggregate 的實現(有興趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我們來看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理執行計劃的

創建聚合分為兩個階段:

AggregateExpression 共有以下幾種 mode:

Q:是否支持使用 hash based agg 是如何判斷的?

摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd

為了說明最常用也是最復雜的的 hash based agg,本小節暫時將示例 sql 改為

這樣就能進入 HashAggregateExec 的分支

構造函數主要工作就是對 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 進行了初始化

在 enable code gen 的情況下,會調用 HashAggregateExec#inputRDDs 來生成 RDD,為了分析 HashAggregateExec 是如何生成 RDD 的,我們設置 spark.sql.codegen.wholeStage 為 false 來 disable code gen,這樣就會調用 HashAggregateExec#doExecute 來生成 RDD,如下:

可以看到,關鍵的部分就是根據 child.execute() 生成的 RDD 的每一個 partition 的迭代器轉化生成一個新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各個 partition。由於 TungstenAggregationIterator 涉及內容非常多,我們單開一大節來進行介紹。

此迭代器:

註:UnsafeKVExternalSorter 的實現可以參考:

UnsafeRow 是 InternalRow(表示一行記錄) 的 unsafe 實現,由原始內存(byte array)而不是 Java 對象支持,由三個區域組成:

使用 UnsafeRow 的收益:

構造函數的主要流程已在上圖中說明,需要注意的是:當內存不足時(畢竟每個 grouping 對應的 agg buffer 直接佔用內存,如果 grouping 非常多,或者 agg buffer 較大,容易出現內存用盡)會從 hash based aggregate 切換為 sort based aggregate(會 spill 數據到磁碟),後文會進行詳述。先來看看最關鍵的 processInputs 方法的實現

上圖中,需要注意的是:hashMap 中 get 一個 groupingKey 對應的 agg buffer 時,若已經存在該 buffer 則直接返回;若不存在,嘗試申請內存新建一個:

上圖中,用於真正處理一條 row 的 AggregationIterator#processRow 還需進一步展開分析。在此之前,我們先來看看 AggregateFunction 的分類

AggregateFunction 可以分為 DeclarativeAggregate 和 ImperativeAggregate 兩大類,具體的聚合函數均為這兩類的子類。

DeclarativeAggregate 是一類直接由 Catalyst 中的 Expressions 構成的聚合函數,主要邏輯通過調用 4 個表達式完成,分別是:

我們再次以容易理解的 Count 來舉例說明:

通常來講,實現一個基於 Expressions 的 DeclarativeAggregate 函數包含以下幾個重要的組成部分:

再來看看 AggregationIterator#processRow

AggregationIterator#processRow 會調用

生成用於處理一行數據(row)的函數

說白了 processRow 生成了函數才是直接用來接受一條 input row 來更新對應的 agg buffer,具體是根據 mode 及 aggExpression 中的 aggFunction 的類型調用其 updateExpressions 或 mergeExpressions 方法:

比如,對於 aggFunction 為 DeclarativeAggregate 類型的 Partial 下的 Count 來說就是調用其 updateExpressions 方法,即:

對於 Final 的 Count 來說就是調用其 mergeExpressions 方法,即:

對於 aggFunction 為 ImperativeAggregate 類型的 Partial 下的 Collect 來說就是調用其 update 方法,即:

對於 Final 的 Collect 來說就是調用其 merge 方法,即:

我們都知道,讀取一個迭代器的數據,是要不斷調用 hasNext 方法進行 check 是否還有數據,當該方法返回 true 的時候再調用 next 方法取得下一條數據。所以要知道如何讀取 TungstenAggregationIterator 的數據,就得分析其這兩個方法。

分為兩種情況,分別是:

Agg 的實現確實復雜,本文雖然篇幅已經很長,但還有很多方面沒有 cover 到,但基本最核心、最復雜的點都詳細介紹了,如果對於未 cover 的部分有興趣,請自行閱讀源碼進行分析~

㈩ Spark SQL 到底怎麼搭建起來

1、spark1.0的包編譯時指定支持hive: ./make-distribution.sh --hadoop 2.3.0-cdh5.0.0 --with-yarn --with-hive --tgz
2、安裝完spark1.0;
3、安裝與hadoop對應的CDH版本的hive;
Spark SQL 支持Hive案例:
1、將hive-site.xml配置文件拷貝到$SPARK_HOME/conf下
hive-site.xml文件內容形如:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop000:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>

<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>

<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
</property>
</configuration>

2、啟動spark: spark-shell
案例來源於spark官方文檔: http://spark.apache.org/docs/latest/sql-programming-guide.html
//創建hiveContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// 隱式轉換
import hiveContext._

//創建hive表
hql("CREATE TABLE IF NOT EXISTS hive.kv_src (key INT, value STRING)")

//載入數據到hive表
hql("LOAD DATA LOCAL INPATH '/home/spark/app/spark-1.0.0-bin-2.3.0-cdh5.0.0/examples/src/main/resources/kv1.txt' INTO TABLE hive.kv_src")

//通過hql查詢
hql("FROM hive.kv_src SELECT key, value").collect().foreach(println)