當前位置:首頁 » 編程語言 » sparksqltop
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

sparksqltop

發布時間: 2023-07-10 20:37:44

Ⅰ 如何使用 Spark sql

一、啟動方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2

註:/data/spark-1.4.0-bin-cdh4/為spark的安裝路徑

/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看啟動選項

--master MASTER_URL 指定master url
--executor-memory MEM 每個executor的內存,默認為1G
--total-executor-cores NUM 所有executor的總核數
-e <quoted-query-string> 直接執行查詢SQL

-f <filename> 以文件方式批量執行SQL

二、Spark sql對hive支持的功能

1、查詢語句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作運算:
1) 關系運算:= ==, <>, <, >, >=, <=
2) 算術運算:+, -, *, /, %
3) 邏輯運算:AND, &&, OR, ||
4) 復雜的數據結構
5) 數學函數:(sign, ln, cos, etc)
6) 字元串函數:
3、 UDF
4、 UDAF

5、 用戶定義的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查詢: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分區表
12、 視圖
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE

14、 支持的數據類型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT

三、Spark sql 在客戶端編程方式進行查詢數據
1、啟動spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、編寫程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有數據:df.show()
查看錶結構:df.printSchema()
只看name列:df.select("name").show()
對數據運算:df.select(df("name"), df("age") + 1).show()
過濾數據:df.filter(df("age") > 21).show()

分組統計:df.groupBy("age").count().show()

1、查詢txt數據
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件

val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查詢結果數據
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet「)

四、Spark sql性能調優

緩存數據表:sqlContext.cacheTable("tableName")

取消緩存表:sqlContext.uncacheTable("tableName")

spark.sql.inMemoryColumnarStorage.compressedtrue當設置為true時,Spark SQL將為基於數據統計信息的每列自動選擇一個壓縮演算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱狀緩存的批數據大小。更大的批數據可以提高內存的利用率以及壓縮效率,但有OOMs的風險

Ⅱ Spark-sql讀取hive分區表限制分區過濾條件及限制分區數量

在開發過程中使用spark去讀取hive分區表的過程中(或者使用hive on spark、nodepad開發工具),部分開發人員未注意添加分區屬性過濾導致在執行過程中載入了全量數據,引起任務執行效率低、磁碟IO大量損耗等問題

1、自定義規則CheckPartitionTable類,實現Rule

然後通過此種方法創建SparkSession
2、自定義規則CheckPartitionTable類,實現Rule,將規則類追加致Optimizer.batches: Seq[Batch]中

1、CheckPartitionTable規則執行類,需要通過引入sparkSession從而獲取到引入conf;需要繼承Rule[LogicalPlan];

2、通過splitPredicates方法,分離分區謂詞,得到分區謂詞表達式
在sql解析過程中將謂詞解析為TreeNode,此處採用遞歸的方式獲取分區謂詞

3、判斷是否是分區表,且是否添加分區欄位

4、實現Rule的apply方法

關於spark-sql的主要執行流程及預備知識,可參照我同學的這篇博文 https://www.jianshu.com/p/4cc6797fb9ce

Ⅲ spark sql dataset怎麼做分組排序呢

sparksql怎樣取分組後的topn
Spark SQL 開窗函數
1、Spark 1.5.x版本以後,在Spark SQL和DataFrame中引入了開窗函數,比如最經典的就是我們的row_number(),可以讓我們實現分組取topn的邏輯。
2、做一個案例進行topn的取值(利用Spark的開窗函數),不知道是否還有印象,我們之前在最早的時候,做過topn的計算,當時是非常麻煩的。但是現在用了Spark SQL之後,非常方便。

Ⅳ Ku:Spark SQL操作Ku

摘要: Spark SQL , Ku

參考 https://github.com/xieenze/SparkOnKu/blob/master/src/main/scala/com/spark/test/KuCRUD.scala

引入 spark-core_2.11 , spark-sql_2.11 , ku-spark2_2.11 , hadoop-client 依賴包

指定 ku.master" , ku.table ,如果讀取超時加入 ku.operation.timeout.ms 參數

或者

寫入數據可以使用dataframe的 write 方法,也可以使用 kuContext 的 updateRows , insertRows , upsertRows , insertIgnoreRows 方法

直接調用dataframe的write方法指定 ku.master , ku.table ,只支持 append 模式,對已有key的數據自動更新

調用kuContext的 upsertRows 方法,效果和dataframe調用write append模式一樣

調用kuContext insertRows , insertIgnoreRows 方法,如果插入的數據key已存在insertRows直接報錯,insertIgnoreRows忽略已存在的key,只插入不存在的key

調用kuContext updateRows 方法,對已經存在的key數據做更新,如果key不存在直接報錯

使用已有dataframe的schema建表

使用 StructType 自定義schema

刪除表和判斷表是否存在

Ⅳ Spark SQL(十):Hive On Spark

Hive是目前大數據領域,事實上的SQL標准。其底層默認是基於MapRece實現的,但是由於MapRece速度實在比較慢,因此這幾年,陸續出來了新的SQL查詢引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。

Spark SQL與Hive On Spark是不一樣的。Spark SQL是Spark自己研發出來的針對各種數據源,包括Hive、JSON、Parquet、JDBC、RDD等都可以執行查詢的,一套基於Spark計算引擎的查詢引擎。因此它是Spark的一個項目,只不過提供了針對Hive執行查詢的工功能而已,適合在一些使用Spark技術棧的大數據應用類系統中使用。

而Hive On Spark,是Hive的一個項目,它是將Spark作為底層的查詢引擎(不通過MapRece作為唯一的查詢引擎)。Hive On Spark,只適用於Hive,在可預見的未來,很有可能Hive默認的底層引擎就從MapRece切換為Spark了;適合於將原有的Hive數據倉庫以及數據統計分析替換為Spark引擎,作為全公司通用的大數據統計分析引擎。

Hive On Spark做了一些優化:
1、Map Join
Spark SQL默認對join是支持使用broadcast機制將小表廣播到各個節點上,以進行join的。但是問題是,這會給Driver和Worker帶來很大的內存開銷。因為廣播的數據要一直保留在Driver內存中。所以目前採取的是,類似乎MapRece的Distributed Cache機制,即提高HDFS replica factor的復制因子,以讓數據在每個計算節點上都有一個備份,從而可以在本地進行數據讀取。

2、Cache Table
對於某些需要對一張表執行多次操作的場景,Hive On Spark內部做了優化,即將要多次操作的表cache到內存中,以便於提升性能。但是這里要注意,並不是對所有的情況都會自動進行cache。所以說,Hive On Spark還有很多不完善的地方。

Hive QL語句 =>
語法分析 => AST =>
生成邏輯執行計劃 => Operator Tree =>
優化邏輯執行計劃 => Optimized Operator Tree =>
生成物理執行計劃 => Task Tree =>
優化物理執行計劃 => Optimized Task Tree =>
執行優化後的Optimized Task Tree

Ⅵ spark SQL和hive到底什麼關系

Hive是一種基於HDFS的數據倉庫,並且提供了基於SQL模型的,針對存儲了大數據的數據倉庫,進行分布式交互查詢的查詢引擎。

SparkSQL並不能完全替代Hive,它替代的是Hive的查詢引擎,SparkSQL由於其底層基於Spark自身的基於內存的特點,因此速度是Hive查詢引擎的數倍以上,Spark本身是不提供存儲的,所以不可能替代Hive作為數據倉庫的這個功能。

SparkSQL相較於Hive的另外一個優點,是支持大量不同的數據源,包括hive、json、parquet、jdbc等等。SparkSQL由於身處Spark技術堆棧內,基於RDD來工作,因此可以與Spark的其他組件無縫整合使用,配合起來實現許多復雜的功能。比如SparkSQL支持可以直接針對hdfs文件執行sql語句。

Ⅶ 可能是全網最詳細的 Spark Sql Aggregate 源碼剖析

縱觀 Spark Sql 源碼,聚合的實現是其中較為復雜的部分,本文希望能以例子結合流程圖的方式來說清楚整個過程。這里僅關注 Aggregate 在物理執行計劃相關的內容,之前的 parse、analyze 及 optimize 階段暫不做分析。在 Spark Sql 中,有一個專門的 Aggregation strategy 用來處理聚合,我們先來看看這個策略。

本文暫不討論 distinct Aggregate 的實現(有興趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我們來看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理執行計劃的

創建聚合分為兩個階段:

AggregateExpression 共有以下幾種 mode:

Q:是否支持使用 hash based agg 是如何判斷的?

摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd

為了說明最常用也是最復雜的的 hash based agg,本小節暫時將示例 sql 改為

這樣就能進入 HashAggregateExec 的分支

構造函數主要工作就是對 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 進行了初始化

在 enable code gen 的情況下,會調用 HashAggregateExec#inputRDDs 來生成 RDD,為了分析 HashAggregateExec 是如何生成 RDD 的,我們設置 spark.sql.codegen.wholeStage 為 false 來 disable code gen,這樣就會調用 HashAggregateExec#doExecute 來生成 RDD,如下:

可以看到,關鍵的部分就是根據 child.execute() 生成的 RDD 的每一個 partition 的迭代器轉化生成一個新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各個 partition。由於 TungstenAggregationIterator 涉及內容非常多,我們單開一大節來進行介紹。

此迭代器:

註:UnsafeKVExternalSorter 的實現可以參考:

UnsafeRow 是 InternalRow(表示一行記錄) 的 unsafe 實現,由原始內存(byte array)而不是 Java 對象支持,由三個區域組成:

使用 UnsafeRow 的收益:

構造函數的主要流程已在上圖中說明,需要注意的是:當內存不足時(畢竟每個 grouping 對應的 agg buffer 直接佔用內存,如果 grouping 非常多,或者 agg buffer 較大,容易出現內存用盡)會從 hash based aggregate 切換為 sort based aggregate(會 spill 數據到磁碟),後文會進行詳述。先來看看最關鍵的 processInputs 方法的實現

上圖中,需要注意的是:hashMap 中 get 一個 groupingKey 對應的 agg buffer 時,若已經存在該 buffer 則直接返回;若不存在,嘗試申請內存新建一個:

上圖中,用於真正處理一條 row 的 AggregationIterator#processRow 還需進一步展開分析。在此之前,我們先來看看 AggregateFunction 的分類

AggregateFunction 可以分為 DeclarativeAggregate 和 ImperativeAggregate 兩大類,具體的聚合函數均為這兩類的子類。

DeclarativeAggregate 是一類直接由 Catalyst 中的 Expressions 構成的聚合函數,主要邏輯通過調用 4 個表達式完成,分別是:

我們再次以容易理解的 Count 來舉例說明:

通常來講,實現一個基於 Expressions 的 DeclarativeAggregate 函數包含以下幾個重要的組成部分:

再來看看 AggregationIterator#processRow

AggregationIterator#processRow 會調用

生成用於處理一行數據(row)的函數

說白了 processRow 生成了函數才是直接用來接受一條 input row 來更新對應的 agg buffer,具體是根據 mode 及 aggExpression 中的 aggFunction 的類型調用其 updateExpressions 或 mergeExpressions 方法:

比如,對於 aggFunction 為 DeclarativeAggregate 類型的 Partial 下的 Count 來說就是調用其 updateExpressions 方法,即:

對於 Final 的 Count 來說就是調用其 mergeExpressions 方法,即:

對於 aggFunction 為 ImperativeAggregate 類型的 Partial 下的 Collect 來說就是調用其 update 方法,即:

對於 Final 的 Collect 來說就是調用其 merge 方法,即:

我們都知道,讀取一個迭代器的數據,是要不斷調用 hasNext 方法進行 check 是否還有數據,當該方法返回 true 的時候再調用 next 方法取得下一條數據。所以要知道如何讀取 TungstenAggregationIterator 的數據,就得分析其這兩個方法。

分為兩種情況,分別是:

Agg 的實現確實復雜,本文雖然篇幅已經很長,但還有很多方面沒有 cover 到,但基本最核心、最復雜的點都詳細介紹了,如果對於未 cover 的部分有興趣,請自行閱讀源碼進行分析~